/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.CacheException;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
import javax.management.MBeanServer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.configuration.WarmUpConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.cdc.CdcManager;
import org.apache.ignite.internal.cdc.CdcUtilityActiveCdcManager;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.systemview.walker.CacheGroupIoViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.CachePagesListViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.PartitionStateViewWalker;
import org.apache.ignite.internal.metric.IoStatisticsType;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.FinishPreloadingTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionDefferedDeleteQueueCleanupTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.PagesList;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cache.warmup.WarmUpStrategy;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.compress.CompressionHandler;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.processors.security.sandbox.IgniteSandbox;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.IgniteCollectors;
import org.apache.ignite.internal.util.InitializationProtector;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainClosure2;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.mxbean.IgniteMBeanAware;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.systemview.view.CacheGroupIoView;
import org.apache.ignite.spi.systemview.view.CachePagesListView;
import org.apache.ignite.spi.systemview.view.PartitionStateView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;

/**
 * The main responsibility of this processor is orchestrating of starting, stopping caches and everything related
 * to that activity like validation of cache configuration, restoring configuration from persistence, etc.
 */
@SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
public class GridCacheProcessor extends GridProcessorAdapter {
    /** */
    public static final String CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT =
        "Failed to perform %s operation (cluster is in read-only mode) [cacheGrp=%s, cache=%s]";

    /** */
    private static final String CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT =
        "Cannot start/stop cache within lock or transaction [cacheNames=%s, operation=%s]";

    /** System view name for page lists. */
    public static final String CACHE_GRP_PAGE_LIST_VIEW = "cacheGroupPageLists";

    /** System view description for page lists. */
    public static final String CACHE_GRP_PAGE_LIST_VIEW_DESC = "Cache group page lists";

    /** System view name for partition states. */
    public static final String PART_STATES_VIEW = "partitionStates";

    /** System view description for partition states. */
    public static final String PART_STATES_VIEW_DESC = "Distribution of cache group partitions across cluster nodes";

    /** System view name for cache group IO. */
    public static final String CACHE_GRP_IO_VIEW = metricName("local", "cache", "groups", "io");

    /** System view description for cache group IO. */
    public static final String CACHE_GRP_IO_VIEW_DESC = "Local node IO statistics for cache groups";

    /** @see IgniteSystemProperties#IGNITE_ALLOW_START_CACHES_IN_PARALLEL */
    public static final boolean DFLT_ALLOW_START_CACHES_IN_PARALLEL = true;

    /** Enables start caches in parallel. */
    private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL,
            DFLT_ALLOW_START_CACHES_IN_PARALLEL);

    /** */
    private final boolean keepStaticCacheConfiguration = IgniteSystemProperties.getBoolean(
        IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION);

    /**
     * Initial timeout (in milliseconds) for output the progress of restoring partitions status.
     * After the first output, the next ones will be output after value/5.
     * It is recommended to change this property only in tests.
     */
    static long TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS = TimeUnit.MINUTES.toMillis(5);

    /** Shared cache context. */
    private GridCacheSharedContext<?, ?> sharedCtx;

    /** */
    private final ConcurrentMap<Integer, CacheGroupContext> cacheGrps = new ConcurrentHashMap<>();

    /** */
    private final Map<String, GridCacheAdapter<?, ?>> caches;

    /** Caches stopped from onKernalStop callback. */
    private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>();

    /** Map of proxies. */
    private final ConcurrentHashMap<String, IgniteCacheProxyImpl<?, ?>> jCacheProxies;

    /** Transaction interface implementation. */
    private IgniteTransactionsImpl transactions;

    /** Pending cache operations. */
    private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();

    /** Template configuration add futures. */
    private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();

    /** Enable/disable cache statistics futures. */
    private ConcurrentMap<UUID, EnableStatisticsFuture> manageStatisticsFuts = new ConcurrentHashMap<>();

    /** */
    private ClusterCachesInfo cachesInfo;

    /** */
    private GridLocalConfigManager locCfgMgr;

    /** */
    private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();

    /** Must use JDK marsh since it is used by discovery to fire custom events. */
    private final Marshaller marsh;

    /** Count down latch for caches. */
    private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);

    /** Internal cache names. */
    private final Set<String> internalCaches;

    /** Protector of initialization of specific value. */
    private final InitializationProtector initializationProtector = new InitializationProtector();

    /** Cache recovery lifecycle state and actions. */
    private final CacheRecoveryLifecycle recovery = new CacheRecoveryLifecycle();

    /** Cache configuration splitter. */
    private CacheConfigurationSplitter splitter;

    /** Cache configuration enricher. */
    private CacheConfigurationEnricher enricher;

    /** */
    public static final String EXPRITY_POLICY_MSG = "expiryPolicy=[type=%s, isEagerTtl=%s]";

    /**
     * @param ctx Kernal context.
     */
    public GridCacheProcessor(GridKernalContext ctx) {
        super(ctx);

        caches = new ConcurrentHashMap<>();
        jCacheProxies = new ConcurrentHashMap<>();
        internalCaches = new HashSet<>();

        marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
        splitter = new CacheConfigurationSplitterImpl(ctx, marsh);
        enricher = new CacheConfigurationEnricher(ctx, marsh, U.resolveClassLoader(ctx.config()));
    }

    /**
     * @param cfg Configuration to check for possible performance issues.
     * @param hasStore {@code True} if store is configured.
     */
    private void suggestOptimizations(CacheConfiguration cfg, boolean hasStore) {
        GridPerformanceSuggestions perf = ctx.performance();

        String msg = "Disable eviction policy (remove from configuration)";

        if (cfg.getEvictionPolicyFactory() != null || cfg.getEvictionPolicy() != null)
            perf.add(msg, false);
        else
            perf.add(msg, true);

        if (cfg.getCacheMode() == PARTITIONED)
            perf.add("Disable near cache (set 'nearConfiguration' to null)", cfg.getNearConfiguration() == null);

        // Suppress warning if at least one ATOMIC cache found.
        perf.add("Enable ATOMIC mode if not using transactions (set 'atomicityMode' to ATOMIC)",
            cfg.getAtomicityMode() == ATOMIC);

        // Suppress warning if at least one non-FULL_SYNC mode found.
        perf.add("Disable fully synchronous writes (set 'writeSynchronizationMode' to PRIMARY_SYNC or FULL_ASYNC)",
            cfg.getWriteSynchronizationMode() != FULL_SYNC);

        if (hasStore && cfg.isWriteThrough())
            perf.add("Enable write-behind to persistent store (set 'writeBehindEnabled' to true)",
                cfg.isWriteBehindEnabled());
    }

    /**
     * Create exchange worker task for custom discovery message.
     *
     * @param msg Custom discovery message.
     * @return Task or {@code null} if message doesn't require any special processing.
     */
    public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
        if (msg instanceof SchemaAbstractDiscoveryMessage) {
            SchemaAbstractDiscoveryMessage msg0 = (SchemaAbstractDiscoveryMessage)msg;

            if (msg0.exchange())
                return new SchemaExchangeWorkerTask(remoteSecurityContext(ctx), msg0);
        }
        else if (msg instanceof ClientCacheChangeDummyDiscoveryMessage) {
            ClientCacheChangeDummyDiscoveryMessage msg0 = (ClientCacheChangeDummyDiscoveryMessage)msg;

            return msg0;
        }
        else if (msg instanceof CacheStatisticsModeChangeMessage) {
            CacheStatisticsModeChangeMessage msg0 = (CacheStatisticsModeChangeMessage)msg;

            if (msg0.initial())
                return new CacheStatisticsModeChangeTask(remoteSecurityContext(ctx), msg0);
        }

        return null;
    }

    /**
     * Process custom exchange task.
     *
     * @param task Task.
     */
    void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
        if (task instanceof SchemaExchangeWorkerTask) {
            SchemaAbstractDiscoveryMessage msg = ((SchemaExchangeWorkerTask)task).message();

            if (msg instanceof SchemaProposeDiscoveryMessage) {
                SchemaProposeDiscoveryMessage msg0 = (SchemaProposeDiscoveryMessage)msg;

                ctx.query().onSchemaPropose(msg0);
            }
            else
                U.warn(log, "Unsupported schema discovery message: " + msg);
        }
        else if (task instanceof SchemaNodeLeaveExchangeWorkerTask) {
            SchemaNodeLeaveExchangeWorkerTask task0 = (SchemaNodeLeaveExchangeWorkerTask)task;

            ctx.query().onNodeLeave(task0.node());
        }
        else if (task instanceof ClientCacheChangeDummyDiscoveryMessage) {
            ClientCacheChangeDummyDiscoveryMessage task0 = (ClientCacheChangeDummyDiscoveryMessage)task;

            sharedCtx.affinity().processClientCachesRequests(task0);
        }
        else if (task instanceof ClientCacheUpdateTimeout) {
            ClientCacheUpdateTimeout task0 = (ClientCacheUpdateTimeout)task;

            sharedCtx.affinity().sendClientCacheChangesMessage(task0);
        }
        else if (task instanceof CacheStatisticsModeChangeTask) {
            CacheStatisticsModeChangeTask task0 = (CacheStatisticsModeChangeTask)task;

            processStatisticsModeChange(task0.message());
        }
        else if (task instanceof TxTimeoutOnPartitionMapExchangeChangeTask) {
            TxTimeoutOnPartitionMapExchangeChangeTask task0 = (TxTimeoutOnPartitionMapExchangeChangeTask)task;

            sharedCtx.tm().processTxTimeoutOnPartitionMapExchangeChange(task0.message());
        }
        else if (task instanceof StopCachesOnClientReconnectExchangeTask) {
            StopCachesOnClientReconnectExchangeTask task0 = (StopCachesOnClientReconnectExchangeTask)task;

            stopCachesOnClientReconnect(task0.stoppedCaches());

            task0.onDone();
        }
        else if (task instanceof WalStateNodeLeaveExchangeTask) {
            WalStateNodeLeaveExchangeTask task0 = (WalStateNodeLeaveExchangeTask)task;

            sharedCtx.walState().onNodeLeft(task0.node().id());
        }
        else if (task instanceof FinishPreloadingTask) {
            FinishPreloadingTask task0 = (FinishPreloadingTask)task;

            CacheGroupContext grp = cacheGroup(task0.groupId());

            if (grp != null)
                grp.preloader().finishPreloading(task0.topologyVersion(), task0.rebalanceId());
        }
        else
            U.warn(log, "Unsupported custom exchange task: " + task);
    }

    /**
     * @param ctx Context.
     * @return DHT managers.
     */
    private List<GridCacheManager> dhtManagers(GridCacheContext ctx) {
        return F.asList(ctx.store(), ctx.events(), ctx.evicts(), ctx.queries(), ctx.continuousQueries(),
            ctx.dr());
    }

    /**
     * @param ctx Context.
     * @return Managers present in both, DHT and Near caches.
     */
    @SuppressWarnings("IfMayBeConditional")
    private Collection<GridCacheManager> dhtExcludes(GridCacheContext ctx) {
        if (!isNearEnabled(ctx))
            return Collections.emptyList();
        else
            return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store());
    }

    /**
     * @param cfg Configuration.
     * @param objs Extra components.
     * @throws IgniteCheckedException If failed to inject.
     */
    private void prepare(CacheConfiguration cfg, Collection<Object> objs) throws IgniteCheckedException {
        prepare(cfg, cfg.getAffinity(), false);
        prepare(cfg, cfg.getAffinityMapper(), false);
        prepare(cfg, cfg.getEvictionFilter(), false);
        prepare(cfg, cfg.getInterceptor(), false);

        for (Object obj : objs)
            prepare(cfg, obj, false);
    }

    /**
     * @param cfg Cache configuration.
     * @param rsrc Resource.
     * @param near Near flag.
     * @throws IgniteCheckedException If failed.
     */
    private void prepare(CacheConfiguration cfg, @Nullable Object rsrc, boolean near) throws IgniteCheckedException {
        if (rsrc != null) {
            ctx.resource().injectGeneric(rsrc);

            ctx.resource().injectCacheName(rsrc, cfg.getName());

            registerMbean(rsrc, cfg.getName(), near);
        }
    }

    /**
     * @param cctx Cache context.
     */
    private void cleanup(GridCacheContext cctx) {
        CacheConfiguration cfg = cctx.config();

        cleanup(cfg, cfg.getAffinity(), false);
        cleanup(cfg, cfg.getAffinityMapper(), false);
        cleanup(cfg, cfg.getEvictionFilter(), false);
        cleanup(cfg, cfg.getInterceptor(), false);
        cleanup(cfg, cctx.store().configuredStore(), false);

        cctx.cleanup();
    }

    /**
     * @param grp Cache group.
     * @param destroy Group destroy flag.
     */
    private void cleanup(CacheGroupContext grp, boolean destroy) {
        CacheConfiguration cfg = grp.config();

        for (Object obj : grp.configuredUserObjects())
            cleanup(cfg, obj, false);

        grp.metrics().remove(destroy);

        grp.removeIOStatistic(destroy);

        sharedCtx.evict().cleanupRemovedGroup(grp.groupId());
    }

    /**
     * @param cfg Cache configuration.
     * @param rsrc Resource.
     * @param near Near flag.
     */
    private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near) {
        if (rsrc != null) {
            unregisterMbean(rsrc, cfg.getName(), near);

            try {
                ctx.resource().cleanupGeneric(rsrc);
            }
            catch (IgniteCheckedException e) {
                U.error(log, "Failed to cleanup resource: " + rsrc, e);
            }
        }
    }

    /** {@inheritDoc} */
    @SuppressWarnings({"unchecked"})
    @Override public void start() throws IgniteCheckedException {
        ctx.internalSubscriptionProcessor().registerMetastorageListener(recovery);
        ctx.internalSubscriptionProcessor().registerDatabaseListener(recovery);

        cachesInfo = new ClusterCachesInfo(ctx);

        DeploymentMode depMode = ctx.config().getDeploymentMode();

        if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
            if (depMode != CONTINUOUS && depMode != SHARED)
                U.warn(log, "Deployment mode for cache is not CONTINUOUS or SHARED " +
                    "(it is recommended that you change deployment mode and restart): " + depMode);
        }

        Collection<CacheStoreSessionListener> sesListeners =
            CU.startStoreSessionListeners(ctx, ctx.config().getCacheStoreSessionListenerFactories());

        sharedCtx = createSharedContext(ctx, sesListeners);

        locCfgMgr = new GridLocalConfigManager(this, ctx);

        transactions = new IgniteTransactionsImpl(sharedCtx, null, false);

        // Start shared managers.
        for (GridCacheSharedManager mgr : sharedCtx.managers())
            mgr.start(sharedCtx);

        if (!CU.isPersistenceEnabled(ctx.config()) || ctx.config().isClientMode()) {
            CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();

            if (data != null)
                cachesInfo.onStart(data);
        }

        if (log.isDebugEnabled())
            log.debug("Started cache processor.");

        ctx.state().cacheProcessorStarted();

        ctx.systemView().registerFiltrableView(
            CACHE_GRP_PAGE_LIST_VIEW,
            CACHE_GRP_PAGE_LIST_VIEW_DESC,
            new CachePagesListViewWalker(),
            this::pagesListViewSupplier,
            Function.identity()
        );

        ctx.systemView().registerFiltrableView(
            PART_STATES_VIEW,
            PART_STATES_VIEW_DESC,
            new PartitionStateViewWalker(),
            this::partStatesViewSupplier,
            Function.identity()
        );

        ctx.systemView().registerView(
            CACHE_GRP_IO_VIEW,
            CACHE_GRP_IO_VIEW_DESC,
            new CacheGroupIoViewWalker(),
            () -> F.view(cacheGrps.values(), grp -> !grp.systemCache()),
            grpCtx -> {
                MetricRegistry mreg = ctx.metric().registry(metricName(IoStatisticsType.CACHE_GROUP.metricGroupName(),
                    grpCtx.cacheOrGroupName()));

                return new CacheGroupIoView(grpCtx, mreg);
            }
        );
    }

    /**
     * @param cfg Initializes cache configuration with proper defaults.
     * @param cacheObjCtx Cache object context.
     * @throws IgniteCheckedException If configuration is not valid.
     */
    void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
        CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
    }

    /**
     * @param grpId Group ID.
     * @return Cache group.
     */
    @Nullable public CacheGroupContext cacheGroup(int grpId) {
        return cacheGrps.get(grpId);
    }

    /**
     * @return Cache groups.
     */
    public Collection<CacheGroupContext> cacheGroups() {
        return cacheGrps.values();
    }

    /** {@inheritDoc} */
    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
        AffinityTopologyVersion joinVer;

        try {
            boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);

            if (checkConsistency)
                ValidationOnNodeJoinUtils.checkConsistency(ctx, log);

            cachesInfo.onKernalStart(checkConsistency);

            sharedCtx.walState().onKernalStart();

            ctx.query().onCacheKernalStart();

            joinVer = sharedCtx.exchange().onKernalStart(active, false);
        }
        finally {
            cacheStartedLatch.countDown();
        }

        if (!ctx.clientNode())
            sharedCtx.time().addTimeoutObject(new PartitionDefferedDeleteQueueCleanupTask(
                sharedCtx, Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, DFLT_CACHE_REMOVE_ENTRIES_TTL)));

        // Notify shared managers.
        for (GridCacheSharedManager mgr : sharedCtx.managers())
            mgr.onKernalStart(active);

        // Escape if cluster inactive.
        if (!active)
            return;

        awaitRebalance(joinVer).get();
    }

    /**
     * Await for local join finish and caches start.
     *
     * @throws IgniteCheckedException If failed to wait.
     */
    public void awaitStarted() throws IgniteCheckedException {
        U.await(cacheStartedLatch);
    }

    /**
     * Await rebalance for caches with SYNC rebalance mode started on local join exchange.
     *
     * @param joinVer Topology version of local join.
     * @return Future indicates that rebalance for SYNC rebalance mode caches has completed.
     */
    private GridCompoundFuture<?, ?> awaitRebalance(AffinityTopologyVersion joinVer) {
        return internalCaches().stream()
            .map(GridCacheAdapter::context)
            .filter(GridCacheContext::affinityNode) // Only affinity caches.
            .filter(ctx -> ctx.config().getRebalanceMode() == SYNC) // Only caches with SYNC mode.
            .filter(ctx -> ctx.startTopologyVersion().equals(joinVer)) // Only caches started on local join.
            .filter(ctx -> ctx.config().getCacheMode() == REPLICATED // Caches without manual rebalance.
                || ctx.config().getCacheMode() == PARTITIONED && ctx.config().getRebalanceDelay() >= 0)
            .map(ctx -> ctx.preloader().syncFuture())
            .collect(IgniteCollectors.toCompoundFuture());
    }

    /** {@inheritDoc} */
    @Override public void stop(boolean cancel) throws IgniteCheckedException {
        stopCaches(cancel);

        List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();

        for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
            GridCacheSharedManager<?, ?> mgr = it.previous();

            mgr.stop(cancel);
        }

        CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners());

        sharedCtx.cleanup();

        if (log.isDebugEnabled())
            log.debug("Stopped cache processor.");
    }

    /**
     * @param cancel Cancel.
     */
    public void stopCaches(boolean cancel) {
        for (String cacheName : locCfgMgr.stopSequence()) {
            GridCacheAdapter<?, ?> cache = stoppedCaches.remove(cacheName);

            if (cache != null)
                stopCache(cache, cancel, false, false);
        }

        for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) {
            if (cache == stoppedCaches.remove(cache.name()))
                stopCache(cache, cancel, false, false);
        }

        for (CacheGroupContext grp : cacheGrps.values())
            stopCacheGroup(grp.groupId(), false);
    }

    /**
     * Blocks all available gateways
     */
    public void blockGateways() {
        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values())
            proxy.context0().gate().onStopped();
    }

    /**
     * Blocks (stops) cache gateway for caches according to given {@code cacheGroupIds}.
     *
     * @param cacheGrpIds Cache group ids for which cache gateway should be stopped.
     * @return Caches for which cache gateway is blocked (stopped).
     */
    public List<GridCacheAdapter> blockGateways(Collection<Integer> cacheGrpIds) {
        List<GridCacheAdapter> affectedCaches = internalCaches().stream()
            .filter(cache -> cacheGrpIds.contains(cache.context().groupId()))
            .collect(toList());

        affectedCaches.forEach(cache -> {
            // Add proxy if it's not initialized.
            addjCacheProxy(cache.context().name(), new IgniteCacheProxyImpl(cache.context(), cache, false));

            // Stop proxy.
            blockGateway(cache.context().name(), true, false);
        });

        return affectedCaches;
    }

    /** {@inheritDoc} */
    @Override public void onKernalStop(boolean cancel) {
        cacheStartedLatch.countDown();

        GridCachePartitionExchangeManager<Object, Object> exch = context().exchange();

        // Stop exchange manager first so that we call onKernalStop on all caches.
        // No new caches should be added after this point.
        exch.onKernalStop(cancel);

        sharedCtx.mvcc().onStop();

        for (CacheGroupContext grp : cacheGrps.values())
            grp.onKernalStop();

        onKernalStopCaches(cancel);

        cancelFutures();

        List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();

        for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
            it.hasPrevious(); ) {
            GridCacheSharedManager<?, ?> mgr = it.previous();

            if (mgr != exch)
                mgr.onKernalStop(cancel);
        }
    }

    /**
     * @param cancel Cancel.
     */
    public void onKernalStopCaches(boolean cancel) {
        IgniteCheckedException affErr =
            new IgniteCheckedException("Failed to wait for topology update, node is stopping.");

        for (CacheGroupContext grp : cacheGrps.values()) {
            GridAffinityAssignmentCache aff = grp.affinity();

            aff.cancelFutures(affErr);
        }

        for (String cacheName : locCfgMgr.stopSequence()) {
            GridCacheAdapter<?, ?> cache = caches.remove(cacheName);

            if (cache != null) {
                stoppedCaches.put(cacheName, cache);

                onKernalStop(cache, cancel);
            }
        }

        for (Map.Entry<String, GridCacheAdapter<?, ?>> entry : caches.entrySet()) {
            GridCacheAdapter<?, ?> cache = entry.getValue();

            if (cache == caches.remove(entry.getKey())) {
                stoppedCaches.put(entry.getKey(), cache);

                onKernalStop(entry.getValue(), cancel);
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
        IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
            ctx.cluster().clientReconnectFuture(),
            "Failed to execute dynamic cache change request, client node disconnected.");

        for (IgniteInternalFuture fut : pendingFuts.values())
            ((GridFutureAdapter)fut).onDone(err);

        for (IgniteInternalFuture fut : pendingTemplateFuts.values())
            ((GridFutureAdapter)fut).onDone(err);

        for (EnableStatisticsFuture fut : manageStatisticsFuts.values())
            fut.onDone(err);

        for (CacheGroupContext grp : cacheGrps.values())
            grp.onDisconnected(reconnectFut);

        for (GridCacheAdapter cache : caches.values()) {
            GridCacheContext cctx = cache.context();

            cctx.gate().onDisconnected(reconnectFut);

            List<GridCacheManager> mgrs = cache.context().managers();

            for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
                GridCacheManager mgr = it.previous();

                mgr.onDisconnected(reconnectFut);
            }
        }

        sharedCtx.onDisconnected(reconnectFut);

        cachesInfo.onDisconnected();
    }

    /**
     * @param cctx Cache context.
     * @param stoppedCaches List where stopped cache should be added.
     */
    private void stopCacheOnReconnect(GridCacheContext cctx, List<GridCacheAdapter> stoppedCaches) {
        cctx.gate().reconnected(true);

        sharedCtx.removeCacheContext(cctx);

        caches.remove(cctx.name());

        completeProxyInitialize(cctx.name());

        jCacheProxies.remove(cctx.name());

        stoppedCaches.add(cctx.cache());
    }

    /** {@inheritDoc} */
    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
        List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());

        DiscoveryDataClusterState state = ctx.state().clusterState();

        boolean active = state.active() && !state.transition();

        ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(active, state.transition());

        final List<GridCacheAdapter> stoppedCaches = new ArrayList<>();

        // Close SQL caches which were not started by client.
        for (String cacheToStop: reconnectRes.stoppedCaches()) {
            if (!caches.keySet().contains(cacheToStop))
                ctx.query().onCacheStop(cacheToStop);
        }

        for (final GridCacheAdapter cache : caches.values()) {
            boolean stopped = reconnectRes.stoppedCacheGroups().contains(cache.context().groupId())
                || reconnectRes.stoppedCaches().contains(cache.name());

            if (stopped)
                stopCacheOnReconnect(cache.context(), stoppedCaches);
            else {
                cache.onReconnected();

                reconnected.add(cache);

                if (cache.context().userCache()) {
                    DynamicCacheDescriptor desc = cacheDescriptor(cache.name());

                    assert desc != null : cache.name();

                    if (!QueryUtils.isEnabled(cache.context().config())
                            && QueryUtils.isEnabled(desc.cacheConfiguration())) {
                        CacheConfiguration newCfg = desc.cacheConfiguration();

                        cache.context().onSchemaAddQueryEntity(newCfg.getQueryEntities(), newCfg.getSqlSchema(),
                                newCfg.isSqlEscapeAll(), newCfg.getQueryParallelism());
                    }

                    boolean rmvIdx = !cache.context().group().persistenceEnabled();

                    // Re-create cache structures inside indexing in order to apply recent schema changes.
                    GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false);

                    ctx.query().onCacheStop0(cacheInfo, rmvIdx, rmvIdx);
                    ctx.query().onCacheStart0(cacheInfo, desc.schema(), desc.sql());
                }
            }
        }

        final Set<Integer> stoppedGrps = reconnectRes.stoppedCacheGroups();

        for (CacheGroupContext grp : cacheGrps.values()) {
            if (stoppedGrps.contains(grp.groupId()))
                cacheGrps.remove(grp.groupId());
            else
                grp.onReconnected();
        }

        sharedCtx.onReconnected(active);

        for (GridCacheAdapter cache : reconnected)
            cache.context().gate().reconnected(false);

        if (!stoppedCaches.isEmpty())
            return sharedCtx.exchange().deferStopCachesOnClientReconnect(stoppedCaches);

        return null;
    }

    /**
     * @param cache Cache to stop.
     * @param cancel Cancel flag.
     * @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
     * @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
     */
    private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel, boolean callDestroy, boolean clearCache) {
        stopCache(cache, cancel, callDestroy, clearCache, true);
    }

    /**
     * @param cache Cache to stop.
     * @param cancel Cancel flag.
     * @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
     * @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
     * @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
     */
    @SuppressWarnings({"unchecked"})
    private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel, boolean callDestroy, boolean clearCache, boolean clearDbObjects) {
        GridCacheContext ctx = cache.context();

        try {
            if (!cache.isNear() && ctx.shared().wal() != null) {
                try {
                    ctx.shared().wal().flush(null, false);
                }
                catch (IgniteCheckedException e) {
                    U.error(log, "Failed to flush write-ahead log on cache stop " +
                        "[cache=" + ctx.name() + "]", e);
                }
            }

            sharedCtx.removeCacheContext(ctx);

            cache.stop();

            cache.removeMetrics(callDestroy);

            GridCacheContextInfo cacheInfo = new GridCacheContextInfo(ctx, false);

            if (clearDbObjects) {
                boolean rmvIdx = !cache.context().group().persistenceEnabled() || callDestroy;
                boolean clearIdx = !cache.context().group().persistenceEnabled() || clearCache;
                ctx.kernalContext().query().onCacheStop(cacheInfo, rmvIdx, clearIdx);
            }
            else
                ctx.kernalContext().query().onClientCacheStop(cacheInfo);

            if (isNearEnabled(ctx)) {
                GridDhtCacheAdapter dht = ctx.near().dht();

                // Check whether dht cache has been started.
                if (dht != null) {
                    dht.stop();

                    dht.removeMetrics(callDestroy);

                    GridCacheContext<?, ?> dhtCtx = dht.context();

                    List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx);

                    for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious(); ) {
                        GridCacheManager mgr = it.previous();

                        mgr.stop(cancel, callDestroy);
                    }
                }
            }

            List<GridCacheManager> mgrs = ctx.managers();

            Collection<GridCacheManager> excludes = dhtExcludes(ctx);

            // Reverse order.
            for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
                GridCacheManager mgr = it.previous();

                if (!excludes.contains(mgr))
                    mgr.stop(cancel, callDestroy);
            }

            ctx.kernalContext().continuous().onCacheStop(ctx);

            ctx.group().stopCache(ctx, clearCache);

            U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), cache.configuration(), ctx.store().configuredStore()));

            if (callDestroy)
                removeCacheConfig(ctx.config());

            if (log.isInfoEnabled()) {
                if (ctx.group().sharedGroup())
                    log.info("Stopped cache [cacheName=" + cache.name() + ", group=" + ctx.group().name() + ']');
                else
                    log.info("Stopped cache [cacheName=" + cache.name() + ']');
            }
        }
        finally {
            cleanup(ctx);
        }
    }

    /** */
    private void removeCacheConfig(CacheConfiguration<?, ?> cacheCfg) {
        if (CU.storeCacheConfig(sharedCtx, cacheCfg)) {
            try {
                locCfgMgr.removeCacheData(new StoredCacheData(cacheCfg));
            }
            catch (IgniteCheckedException e) {
                U.error(log, "Failed to delete cache configuration data while destroying cache" +
                    "[cache=" + cacheCfg.getName() + "]", e);
            }
        }
    }

    /**
     * @param cache Cache.
     * @throws IgniteCheckedException If failed.
     */
    private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
        GridCacheContext<?, ?> ctx = cache.context();

        // Start DHT cache as well.
        if (isNearEnabled(ctx)) {
            GridDhtCacheAdapter dht = ctx.near().dht();

            GridCacheContext<?, ?> dhtCtx = dht.context();

            for (GridCacheManager mgr : dhtManagers(dhtCtx))
                mgr.onKernalStart();

            dht.onKernalStart();

            if (log.isDebugEnabled())
                log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name());
        }

        for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx))))
            mgr.onKernalStart();

        cache.onKernalStart();

        if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED))
            ctx.events().addEvent(EventType.EVT_CACHE_STARTED);

        if (log.isDebugEnabled())
            log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" +
                cache.configuration().getCacheMode() + ']');
    }

    /**
     * @param cache Cache to stop.
     * @param cancel Cancel flag.
     */
    @SuppressWarnings("unchecked")
    private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {
        GridCacheContext ctx = cache.context();

        if (isNearEnabled(ctx)) {
            GridDhtCacheAdapter dht = ctx.near().dht();

            if (dht != null) {
                GridCacheContext<?, ?> dhtCtx = dht.context();

                for (GridCacheManager mgr : dhtManagers(dhtCtx))
                    mgr.onKernalStop(cancel);

                dht.onKernalStop();
            }
        }

        List<GridCacheManager> mgrs = ctx.managers();

        Collection<GridCacheManager> excludes = dhtExcludes(ctx);

        // Reverse order.
        for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
            GridCacheManager mgr = it.previous();

            if (!excludes.contains(mgr))
                mgr.onKernalStop(cancel);
        }

        cache.onKernalStop();

        if (!ctx.isRecoveryMode() && ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED))
            ctx.events().addEvent(EventType.EVT_CACHE_STOPPED);
    }

    /**
     * @param cfg Cache configuration to use to create cache.
     * @param grp Cache group.
     * @param pluginMgr Cache plugin manager.
     * @param desc Cache descriptor.
     * @param locStartTopVer Current topology version.
     * @param cacheObjCtx Cache object context.
     * @param affNode {@code True} if local node affinity node.
     * @param updatesAllowed Updates allowed flag.
     * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will
     * change state of proxies to restarting
     * @return Cache context.
     * @throws IgniteCheckedException If failed to create cache.
     */
    private GridCacheContext<?, ?> createCacheContext(
        CacheConfiguration<?, ?> cfg,
        CacheGroupContext grp,
        @Nullable CachePluginManager pluginMgr,
        DynamicCacheDescriptor desc,
        AffinityTopologyVersion locStartTopVer,
        CacheObjectContext cacheObjCtx,
        boolean affNode,
        boolean updatesAllowed,
        boolean disabledAfterStart,
        boolean recoveryMode
    ) throws IgniteCheckedException {
        assert cfg != null;

        if (cfg.getCacheStoreFactory() instanceof GridCacheLoaderWriterStoreFactory) {
            GridCacheLoaderWriterStoreFactory factory = (GridCacheLoaderWriterStoreFactory)cfg.getCacheStoreFactory();

            prepare(cfg, factory.loaderFactory(), false);
            prepare(cfg, factory.writerFactory(), false);
        }
        else
            prepare(cfg, cfg.getCacheStoreFactory(), false);

        CacheStore cfgStore = null;

        if (cfg.getCacheStoreFactory() != null) {
            IgniteSandbox sandbox = ctx.security().sandbox();

            cfgStore = sandbox.enabled() ?
                sandbox.execute(() -> cfg.getCacheStoreFactory().create()) : cfg.getCacheStoreFactory().create();
        }

        ValidationOnNodeJoinUtils.validate(ctx.config(), cfg, desc.cacheType(), cfgStore, ctx, log, (x, y) -> {
            try {
                assertParameter(x, y);
            }
            catch (IgniteCheckedException ex) {
                return ex;
            }

            return null;
        });

        if (pluginMgr == null)
            pluginMgr = new CachePluginManager(ctx, cfg);

        pluginMgr.validate();

        sharedCtx.jta().registerCache(cfg);

        // Skip suggestions for internal caches.
        if (desc.cacheType().userCache())
            suggestOptimizations(cfg, cfgStore != null);

        Collection<Object> toPrepare = new ArrayList<>();

        if (cfgStore instanceof GridCacheLoaderWriterStore) {
            toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader());
            toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer());
        }
        else
            toPrepare.add(cfgStore);

        prepare(cfg, toPrepare);

        U.startLifecycleAware(lifecycleAwares(grp, cfg, cfgStore));

        boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg);

        GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
        GridCacheEventManager evtMgr = new GridCacheEventManager();
        CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled())
            ? new GridCacheEvictionManager()
            : new CacheOffheapEvictionManager();
        GridCacheQueryManager qryMgr = new GridCacheDistributedQueryManager();
        CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
        CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
        GridCacheTtlManager ttlMgr = new GridCacheTtlManager();

        CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class);
        GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
        CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class);
        PlatformCacheManager platformMgr = ctx.platform().cacheManager();

        if (cfgStore == null)
            storeMgr.initialize(cfgStore, sesHolders);
        else {
            final CacheStore cfgStoreRef = cfgStore;

            initializationProtector.protect(
                cfgStore,
                () -> storeMgr.initialize(cfgStoreRef, sesHolders)
            );
        }

        GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
            ctx,
            sharedCtx,
            cfg,
            grp,
            desc.cacheType(),
            locStartTopVer,
            desc.deploymentId(),
            affNode,
            updatesAllowed,
            desc.cacheConfiguration().isStatisticsEnabled(),
            recoveryMode,
            /*
             * Managers in starting order!
             * ===========================
             */
            evtMgr,
            storeMgr,
            evictMgr,
            qryMgr,
            contQryMgr,
            dataStructuresMgr,
            ttlMgr,
            drMgr,
            rslvrMgr,
            pluginMgr,
            affMgr,
            platformMgr
        );

        cacheCtx.cacheObjectContext(cacheObjCtx);

        GridCacheAdapter cache = null;

        switch (cfg.getCacheMode()) {
            case PARTITIONED:
            case REPLICATED: {
                if (nearEnabled) {
                    switch (cfg.getAtomicityMode()) {
                        case TRANSACTIONAL: {
                            cache = new GridNearTransactionalCache(cacheCtx);

                            break;
                        }
                        case ATOMIC: {
                            cache = new GridNearAtomicCache(cacheCtx);

                            break;
                        }

                        default: {
                            assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
                        }
                    }
                }
                else {
                    switch (cfg.getAtomicityMode()) {
                        case TRANSACTIONAL: {
                            cache = cacheCtx.affinityNode() ?
                                new GridDhtColocatedCache(cacheCtx) :
                                new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap());

                            break;
                        }
                        case ATOMIC: {
                            cache = cacheCtx.affinityNode() ?
                                new GridDhtAtomicCache(cacheCtx) :
                                new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap());

                            break;
                        }

                        default: {
                            assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
                        }
                    }
                }

                break;
            }

            default: {
                assert false : "Invalid cache mode: " + cfg.getCacheMode();
            }
        }

        cache.active(!disabledAfterStart);

        cacheCtx.cache(cache);

        GridCacheContext<?, ?> ret = cacheCtx;

        /*
         * Create DHT cache.
         * ================
         */
        if (nearEnabled) {
            /*
             * Specifically don't create the following managers
             * here and reuse the one from Near cache:
             * 1. GridCacheVersionManager
             * 2. GridCacheIoManager
             * 3. GridCacheDeploymentManager
             * 4. GridCacheQueryManager (note, that we start it for DHT cache though).
             * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though).
             * 6. GridCacheDgcManager
             * 7. GridCacheTtlManager.
             * 8. PlatformCacheManager.
             * ===============================================
             */
            evictMgr = cfg.isOnheapCacheEnabled() ? new GridCacheEvictionManager() : new CacheOffheapEvictionManager();
            evtMgr = new GridCacheEventManager();
            pluginMgr = new CachePluginManager(ctx, cfg);
            drMgr = pluginMgr.createComponent(GridCacheDrManager.class);

            cacheCtx = new GridCacheContext(
                ctx,
                sharedCtx,
                cfg,
                grp,
                desc.cacheType(),
                locStartTopVer,
                desc.deploymentId(),
                affNode,
                true,
                desc.cacheConfiguration().isStatisticsEnabled(),
                recoveryMode,
                /*
                 * Managers in starting order!
                 * ===========================
                 */
                evtMgr,
                storeMgr,
                evictMgr,
                qryMgr,
                contQryMgr,
                dataStructuresMgr,
                ttlMgr,
                drMgr,
                rslvrMgr,
                pluginMgr,
                affMgr,
                platformMgr
            );

            cacheCtx.cacheObjectContext(cacheObjCtx);

            GridDhtCacheAdapter dht = null;

            switch (cfg.getAtomicityMode()) {
                case TRANSACTIONAL: {
                    assert cache instanceof GridNearTransactionalCache;

                    GridNearTransactionalCache near = (GridNearTransactionalCache)cache;

                    GridDhtCache dhtCache = cacheCtx.affinityNode() ?
                        new GridDhtCache(cacheCtx) :
                        new GridDhtCache(cacheCtx, new GridNoStorageCacheMap());

                    dhtCache.near(near);

                    near.dht(dhtCache);

                    dht = dhtCache;

                    break;
                }
                case ATOMIC: {
                    assert cache instanceof GridNearAtomicCache;

                    GridNearAtomicCache near = (GridNearAtomicCache)cache;

                    GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ?
                        new GridDhtAtomicCache(cacheCtx) :
                        new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap());

                    dhtCache.near(near);

                    near.dht(dhtCache);

                    dht = dhtCache;

                    break;
                }

                default: {
                    assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
                }
            }

            cacheCtx.cache(dht);
        }

        return ret;
    }

    /**
     * @param reqs Cache requests to start.
     * @param fut Completable future.
     */
    public void registrateProxyRestart(Map<String, DynamicCacheChangeRequest> reqs, GridFutureAdapter<?> fut) {
        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
            if (reqs.containsKey(proxy.getName()) &&
                proxy.isRestarting() &&
                !reqs.get(proxy.getName()).disabledAfterStart()
            )
                proxy.registrateFutureRestart(fut);
        }
    }

    /**
     * @param reqs Cache requests to start.
     * @param initVer Init exchange version.
     * @param doneVer Finish excahnge vertison.
     */
    public void completeProxyRestart(
        Map<String, DynamicCacheChangeRequest> reqs,
        AffinityTopologyVersion initVer,
        AffinityTopologyVersion doneVer
    ) {
        if (initVer == null || doneVer == null)
            return;

        for (GridCacheAdapter<?, ?> cache : caches.values()) {
            GridCacheContext<?, ?> cacheCtx = cache.context();

            if (reqs.containsKey(cache.name()) ||
                (cacheCtx.startTopologyVersion().compareTo(initVer) <= 0 ||
                    cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0))
                completeProxyInitialize(cache.name());

            if (
                cacheCtx.startTopologyVersion().compareTo(initVer) >= 0 &&
                    cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0
            ) {
                IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cache.name());

                boolean canRestart = Optional.ofNullable(reqs.get(cache.name()))
                    .map(req -> !req.disabledAfterStart())
                    .orElse(true);

                if (proxy != null && proxy.isRestarting() && canRestart) {
                    proxy.onRestarted(cacheCtx, cache);

                    if (cacheCtx.dataStructuresCache())
                        ctx.dataStructures().restart(cache.name(), proxy.internalProxy());
                }
            }
        }
    }

    /**
     * Gets a collection of currently started caches.
     *
     * @return Collection of started cache names.
     */
    public Collection<String> cacheNames() {
        return F.viewReadOnly(cacheDescriptors().values(),
            new IgniteClosure<DynamicCacheDescriptor, String>() {
                @Override public String apply(DynamicCacheDescriptor desc) {
                    return desc.cacheConfiguration().getName();
                }
            });
    }

    /**
     * Gets public cache that can be used for query execution. If cache isn't created on current node it will be
     * started.
     *
     * @param start Start cache.
     * @return Cache or {@code null} if there is no suitable cache.
     */
    public IgniteCacheProxy<?, ?> getOrStartPublicCache(boolean start) throws IgniteCheckedException {
        // Try to find started cache first.
        for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
            if (!e.getValue().context().userCache())
                continue;

            CacheConfiguration ccfg = e.getValue().configuration();

            String cacheName = ccfg.getName();

            return publicJCache(cacheName);
        }

        if (start) {
            for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
                DynamicCacheDescriptor desc = e.getValue();

                if (!desc.cacheType().userCache())
                    continue;

                CacheConfiguration ccfg = desc.cacheConfiguration();

                dynamicStartCache(null, ccfg.getName(), null, false, true, true).get();

                return publicJCache(ccfg.getName());
            }
        }

        return null;
    }

    /**
     * Gets a collection of currently started public cache names.
     *
     * @return Collection of currently started public cache names
     */
    public Collection<String> publicCacheNames() {
        return F.viewReadOnly(cacheDescriptors().values(),
            new IgniteClosure<DynamicCacheDescriptor, String>() {
                @Override public String apply(DynamicCacheDescriptor desc) {
                    return desc.cacheConfiguration().getName();
                }
            },
            new IgnitePredicate<DynamicCacheDescriptor>() {
                @Override public boolean apply(DynamicCacheDescriptor desc) {
                    return desc.cacheType().userCache();
                }
            }
        );
    }

    /**
     * Gets a collection of currently started public cache names.
     *
     * @return Collection of currently started public cache names
     */
    public Collection<String> publicAndDsCacheNames() {
        return F.viewReadOnly(cacheDescriptors().values(),
            new IgniteClosure<DynamicCacheDescriptor, String>() {
                @Override public String apply(DynamicCacheDescriptor desc) {
                    return desc.cacheConfiguration().getName();
                }
            },
            new IgnitePredicate<DynamicCacheDescriptor>() {
                @Override public boolean apply(DynamicCacheDescriptor desc) {
                    return desc.cacheType().userCache() || desc.cacheType() == CacheType.DATA_STRUCTURES;
                }
            }
        );
    }

    /**
     * Gets cache mode.
     *
     * @param cacheName Cache name to check.
     * @return Cache mode.
     */
    public CacheMode cacheMode(String cacheName) {
        assert cacheName != null;

        DynamicCacheDescriptor desc = cacheDescriptor(cacheName);

        return desc != null ? desc.cacheConfiguration().getCacheMode() : null;
    }

    /**
     * @return Caches to be started when this node starts.
     */
    @Nullable public LocalJoinCachesContext localJoinCachesContext() {
        if (ctx.discovery().localNode().order() == 1)
            cachesInfo.filterDynamicCacheDescriptors(locCfgMgr.localCachesOnStart());

        return cachesInfo.localJoinCachesContext();
    }

    /**
     * @param exchTopVer Local join exchange future version.
     * @param locJoinCtx Local join cache context.
     * @throws IgniteCheckedException If failed.
     */
    public IgniteInternalFuture<?> startCachesOnLocalJoin(
        AffinityTopologyVersion exchTopVer,
        LocalJoinCachesContext locJoinCtx
    ) throws IgniteCheckedException {
        long time = U.currentTimeMillis();

        if (locJoinCtx == null)
            return new GridFinishedFuture<>();

        IgniteInternalFuture<?> res = sharedCtx.affinity().initCachesOnLocalJoin(
            locJoinCtx.cacheGroupDescriptors(), locJoinCtx.cacheDescriptors());

        List<StartCacheInfo> startCacheInfos = locJoinCtx.caches().stream()
            .map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false))
            .collect(toList());

        locJoinCtx.initCaches()
            .forEach(cacheDesc -> {
                try {
                    ctx.query().initQueryStructuresForNotStartedCache(cacheDesc);
                }
                catch (Exception e) {
                    log.error("Can't initialize query structures for not started cache [cacheName=" +
                        cacheDesc.cacheName() + "]", e);
                }
            });

        prepareStartCaches(startCacheInfos);

        context().exchange().exchangerUpdateHeartbeat();

        if (log.isInfoEnabled())
            log.info("Starting caches on local join performed in " + (U.currentTimeMillis() - time) + " ms.");

        return res;
    }

    /**
     * @param node Joined node.
     * @return {@code True} if there are new caches received from joined node.
     */
    public boolean hasCachesReceivedFromJoin(ClusterNode node) {
        return cachesInfo.hasCachesReceivedFromJoin(node.id());
    }

    /**
     * Starts statically configured caches received from remote nodes during exchange.
     *
     * @param nodeId Joining node ID.
     * @param exchTopVer Current exchange version.
     * @return Started caches descriptors.
     * @throws IgniteCheckedException If failed.
     */
    public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
        throws IgniteCheckedException {
        List<DynamicCacheDescriptor> receivedCaches = cachesInfo.cachesReceivedFromJoin(nodeId);

        List<StartCacheInfo> startCacheInfos = receivedCaches.stream()
            .filter(desc -> isLocalAffinity(desc.groupDescriptor().config()))
            .map(desc -> new StartCacheInfo(desc, null, exchTopVer, false))
            .collect(toList());

        prepareStartCaches(startCacheInfos);

        return receivedCaches;
    }

    /**
     * @param cacheConfiguration Checked configuration.
     * @return {@code true} if local node is affinity node for cache.
     */
    private boolean isLocalAffinity(CacheConfiguration cacheConfiguration) {
        return CU.affinityNode(ctx.discovery().localNode(), cacheConfiguration.getNodeFilter());
    }

    /**
     * Start all input caches in parallel.
     *
     * @param startCacheInfos All caches information for start.
     */
    void prepareStartCaches(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException {
        prepareStartCaches(startCacheInfos, (data, operation) -> {
            operation.apply(data); // PROXY
        });
    }

    /**
     * Trying to start all input caches in parallel and skip failed caches.
     *
     * @param startCacheInfos Caches info for start.
     * @return Caches which was failed.
     * @throws IgniteCheckedException if failed.
     */
    Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible(
        Collection<StartCacheInfo> startCacheInfos
    ) throws IgniteCheckedException {
        HashMap<StartCacheInfo, IgniteCheckedException> failedCaches = new HashMap<>();

        prepareStartCaches(startCacheInfos, (data, operation) -> {
            try {
                operation.apply(data);
            }
            catch (IgniteInterruptedCheckedException e) {
                throw e;
            }
            catch (IgniteCheckedException e) {
                log.warning("Cache can not be started : cache=" + data.getStartedConfiguration().getName());

                failedCaches.put(data, e);
            }
        });

        return failedCaches;
    }

    /**
     * Start all input caches in parallel.
     *
     * @param startCacheInfos All caches information for start.
     * @param cacheStartFailHandler Fail handler for one cache start.
     */
    private void prepareStartCaches(
        Collection<StartCacheInfo> startCacheInfos,
        StartCacheFailHandler<StartCacheInfo, Void> cacheStartFailHandler
    ) throws IgniteCheckedException {
        if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() <= 1) {
            for (StartCacheInfo startCacheInfo : startCacheInfos) {
                cacheStartFailHandler.handle(
                    startCacheInfo,
                    cacheInfo -> {
                        prepareCacheStart(
                            cacheInfo.getCacheDescriptor(),
                            cacheInfo.getReqNearCfg(),
                            cacheInfo.getExchangeTopVer(),
                            cacheInfo.isDisabledAfterStart(),
                            cacheInfo.isClientCache()
                        );

                        return null;
                    }
                );

                context().exchange().exchangerUpdateHeartbeat();
            }
        }
        else {
            Map<StartCacheInfo, GridCacheContext> cacheCtxs = new ConcurrentHashMap<>();

            // Reserve at least 2 threads for system operations.
            int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);

            doInParallel(
                parallelismLvl,
                sharedCtx.kernalContext().pools().getSystemExecutorService(),
                startCacheInfos,
                startCacheInfo -> {
                    cacheStartFailHandler.handle(
                        startCacheInfo,
                        cacheInfo -> {
                            GridCacheContext cacheCtx = prepareCacheContext(
                                cacheInfo.getCacheDescriptor(),
                                cacheInfo.getReqNearCfg(),
                                cacheInfo.getExchangeTopVer(),
                                cacheInfo.isDisabledAfterStart()
                            );
                            cacheCtxs.put(cacheInfo, cacheCtx);

                            context().exchange().exchangerUpdateHeartbeat();

                            return null;
                        }
                    );

                    return null;
                }
            );

            /*
             * This hack required because we can't start sql schema in parallel by folowing reasons:
             * * checking index to duplicate(and other checking) require one order on every nodes.
             * * onCacheStart and createSchema contains a lot of mutex.
             *
             * TODO IGNITE-9729
             */
            Set<StartCacheInfo> successfullyPreparedCaches = cacheCtxs.keySet();

            List<StartCacheInfo> cacheInfosInOriginalOrder = startCacheInfos.stream()
                .filter(successfullyPreparedCaches::contains)
                .collect(toList());

            for (StartCacheInfo startCacheInfo : cacheInfosInOriginalOrder) {
                cacheStartFailHandler.handle(
                    startCacheInfo,
                    cacheInfo -> {
                        GridCacheContext cctx = cacheCtxs.get(cacheInfo);

                        if (!cctx.isRecoveryMode()) {
                            ctx.query().onCacheStart(
                                new GridCacheContextInfo(cctx, cacheInfo.isClientCache()),
                                cacheInfo.getCacheDescriptor().schema() != null
                                    ? cacheInfo.getCacheDescriptor().schema()
                                    : new QuerySchema(),
                                cacheInfo.getCacheDescriptor().sql()
                            );
                        }

                        context().exchange().exchangerUpdateHeartbeat();

                        return null;
                    }
                );
            }

            doInParallel(
                parallelismLvl,
                sharedCtx.kernalContext().pools().getSystemExecutorService(),
                cacheCtxs.entrySet(),
                cacheCtxEntry -> {
                    cacheStartFailHandler.handle(
                        cacheCtxEntry.getKey(),
                        cacheInfo -> {
                            GridCacheContext<?, ?> cacheCtx = cacheCtxEntry.getValue();

                            if (cacheCtx.isRecoveryMode())
                                finishRecovery(cacheInfo.getExchangeTopVer(), cacheCtx);
                            else
                                onCacheStarted(cacheCtxEntry.getValue());

                            context().exchange().exchangerUpdateHeartbeat();

                            return null;
                        }
                    );

                    return null;
                }
            );
        }
    }

    /**
     * @param desc Cache descriptor.
     * @param reqNearCfg Near configuration if specified for client cache start request.
     * @param exchTopVer Current exchange version.
     * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will
     * change state of proxies to restarting
     * @throws IgniteCheckedException If failed.
     */
    public void prepareCacheStart(
        DynamicCacheDescriptor desc,
        @Nullable NearCacheConfiguration reqNearCfg,
        AffinityTopologyVersion exchTopVer,
        boolean disabledAfterStart,
        boolean clientCache
    ) throws IgniteCheckedException {
        GridCacheContext cacheCtx = prepareCacheContext(desc, reqNearCfg, exchTopVer, disabledAfterStart);

        if (cacheCtx.isRecoveryMode())
            finishRecovery(exchTopVer, cacheCtx);
        else {
            ctx.query().onCacheStart(
                    new GridCacheContextInfo(cacheCtx, clientCache),
                    desc.schema() != null ? desc.schema() : new QuerySchema(),
                    desc.sql()
            );

            onCacheStarted(cacheCtx);
        }
    }

    /**
     * Preparing cache context to start.
     *
     * @param desc Cache descriptor.
     * @param reqNearCfg Near configuration if specified for client cache start request.
     * @param exchTopVer Current exchange version.
     * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change
     *  state of proxies to restarting
     * @return Created {@link GridCacheContext}.
     * @throws IgniteCheckedException if failed.
     */
    private GridCacheContext prepareCacheContext(
        DynamicCacheDescriptor desc,
        @Nullable NearCacheConfiguration reqNearCfg,
        AffinityTopologyVersion exchTopVer,
        boolean disabledAfterStart
    ) throws IgniteCheckedException {
        desc = enricher().enrich(desc, isLocalAffinity(desc.cacheConfiguration()));

        CacheConfiguration startCfg = desc.cacheConfiguration();

        if (caches.containsKey(startCfg.getName())) {
            GridCacheAdapter<?, ?> existingCache = caches.get(startCfg.getName());

            GridCacheContext<?, ?> cctx = existingCache.context();

            assert cctx.isRecoveryMode();

            QuerySchema locSchema = recovery.querySchemas.get(desc.cacheId());

            QuerySchemaPatch locSchemaPatch = locSchema.makePatch(desc.schema().entities());

            // Cache schema is changed after restart, workaround is stop existing cache and start new.
            if (!locSchemaPatch.isEmpty() || locSchemaPatch.hasConflicts())
                stopCacheSafely(cctx);
            else
                return existingCache.context();
        }

        assert !caches.containsKey(startCfg.getName()) : startCfg.getName();

        CacheConfiguration ccfg = new CacheConfiguration(startCfg);

        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);

        boolean affNode = checkForAffinityNode(desc, reqNearCfg, ccfg);

        ctx.cache().context().database().checkpointReadLock();

        try {
            CacheGroupContext grp = getOrCreateCacheGroupContext(
                desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName(), false);

            GridCacheContext cacheCtx = createCacheContext(ccfg,
                grp,
                null,
                desc,
                exchTopVer,
                cacheObjCtx,
                affNode,
                true,
                disabledAfterStart,
                false
            );

            initCacheContext(cacheCtx, ccfg);

            return cacheCtx;
        }
        finally {
            ctx.cache().context().database().checkpointReadUnlock();
        }
    }

    /**
     * Stops cache under checkpoint lock.
     *
     * @param cctx Cache context.
     */
    private void stopCacheSafely(GridCacheContext<?, ?> cctx) {
        stopCacheSafely(cctx, true);
    }

    /**
     * Stops cache under checkpoint lock.
     *
     * @param cctx Cache context.
     * @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
     * The parameters was added due to make differ between cache.close() on client node and distributed destroy cache
     * (e.g. call cache.destroy()).
     *
     * Before add the parameter {@code clearDbObjects}:
     * when on client node is joined we initialize H2 objects for all caches in cluster,
     * but on client cache.close() we destroy part of created objects,
     * making impossible running SQL queries on that cache;
     * clientCache.close() should restore status quo of state right after client join instead).
     */
    private void stopCacheSafely(GridCacheContext<?, ?> cctx, boolean clearDbObjects) {
        sharedCtx.database().checkpointReadLock();

        try {
            prepareCacheStop(cctx.name(), false, false, clearDbObjects);

            if (!cctx.group().hasCaches())
                stopCacheGroup(cctx.group().groupId(), false);
        }
        finally {
            sharedCtx.database().checkpointReadUnlock();
        }

    }

    /**
     * Finishes recovery for given cache context.
     *
     * @param cacheStartVer Cache join to topology version.
     * @param cacheContext Cache context.
     * @throws IgniteCheckedException If failed.
     */
    private void finishRecovery(
        AffinityTopologyVersion cacheStartVer,
        GridCacheContext<?, ?> cacheContext
    ) throws IgniteCheckedException {
        CacheGroupContext grpCtx = cacheContext.group();

        // Take cluster-wide cache descriptor and try to update local cache and cache group parameters.
        DynamicCacheDescriptor updatedDescriptor = cacheDescriptor(cacheContext.cacheId());

        grpCtx.finishRecovery(
            cacheStartVer,
            updatedDescriptor.receivedFrom(),
            isLocalAffinity(updatedDescriptor.cacheConfiguration())
        );

        cacheContext.finishRecovery(cacheStartVer, updatedDescriptor);

        if (isNearEnabled(cacheContext)) {
            GridDhtCacheAdapter dht = cacheContext.near().dht();

            dht.context().finishRecovery(cacheStartVer, updatedDescriptor);
        }

        onKernalStart(cacheContext.cache());

        if (log.isInfoEnabled())
            log.info("Finished recovery for cache [cache=" + cacheContext.name()
                + ", grp=" + grpCtx.cacheOrGroupName() + ", startVer=" + cacheStartVer + "]");
    }

    /**
     * Stops all caches and groups, that was recovered, but not activated on node join. Such caches can remain only if
     * it was filtered by node filter on current node. It's impossible to check whether current node is affinity node
     * for given cache before join to topology.
     */
    public void shutdownNotFinishedRecoveryCaches() {
        for (GridCacheAdapter cacheAdapter : caches.values()) {
            GridCacheContext cacheCtx = cacheAdapter.context();

            if (cacheCtx.isRecoveryMode()) {
                assert !isLocalAffinity(cacheCtx.config())
                    : "Cache " + cacheAdapter.context() + " is still in recovery mode after start, but not activated.";

                stopCacheSafely(cacheCtx);
            }
        }
    }

    /**
     * Check for affinity node and customize near configuration if needed.
     *
     * @param desc Cache descriptor.
     * @param reqNearCfg Near configuration if specified for client cache start request.
     * @param ccfg Cache configuration to use.
     * @return {@code true} if it is affinity node for cache.
     */
    private boolean checkForAffinityNode(
        DynamicCacheDescriptor desc,
        @Nullable NearCacheConfiguration reqNearCfg,
        CacheConfiguration ccfg
    ) {
        if (isLocalAffinity(desc.cacheConfiguration()))
            return true;

        ccfg.setNearConfiguration(reqNearCfg);

        return false;
    }

    /**
     * Prepare page store for start cache.
     *
     * @param desc Cache descriptor.
     * @param affNode {@code true} if it is affinity node for cache.
     * @throws IgniteCheckedException if failed.
     */
    public void preparePageStore(DynamicCacheDescriptor desc, boolean affNode) throws IgniteCheckedException {
        if (sharedCtx.pageStore() != null && affNode)
            initializationProtector.protect(
                desc.groupDescriptor().groupId(),
                () -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData(splitter).config())
            );
    }

    /**
     * Prepare cache group to start cache.
     *
     * @param desc Cache descriptor.
     * @param exchTopVer Current exchange version.
     * @param cacheObjCtx Cache object context.
     * @param affNode {@code true} if it is affinity node for cache.
     * @param grpName Group name.
     * @return Prepared cache group context.
     * @throws IgniteCheckedException if failed.
     */
    private CacheGroupContext getOrCreateCacheGroupContext(
        DynamicCacheDescriptor desc,
        AffinityTopologyVersion exchTopVer,
        CacheObjectContext cacheObjCtx,
        boolean affNode,
        String grpName,
        boolean recoveryMode
    ) throws IgniteCheckedException {
        if (grpName != null) {
            return initializationProtector.protect(
                desc.groupId(),
                () -> findCacheGroup(grpName),
                () -> startCacheGroup(
                    desc.groupDescriptor(),
                    desc.cacheType(),
                    affNode,
                    cacheObjCtx,
                    exchTopVer,
                    recoveryMode
                )
            );
        }

        return startCacheGroup(desc.groupDescriptor(),
            desc.cacheType(),
            affNode,
            cacheObjCtx,
            exchTopVer,
            recoveryMode
        );
    }

    /**
     * Initialize created cache context.
     *
     * @param cacheCtx Cache context to initializtion.
     * @param cfg Cache configuration.
     * @throws IgniteCheckedException if failed.
     */
    private void initCacheContext(
        GridCacheContext<?, ?> cacheCtx,
        CacheConfiguration cfg
    ) throws IgniteCheckedException {
        GridCacheAdapter cache = cacheCtx.cache();

        sharedCtx.addCacheContext(cacheCtx);

        caches.put(cacheCtx.name(), cache);

        // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
        if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY
            && !(ctx.config().getMarshaller() instanceof BinaryMarshaller))
            U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " +
                "BinaryMarshaller is not used");

        // Start managers.
        for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
            mgr.start(cacheCtx);

        cacheCtx.initConflictResolver();

        if (GridCacheUtils.isNearEnabled(cfg)) {
            GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();

            // Start DHT managers.
            for (GridCacheManager mgr : dhtManagers(dhtCtx))
                mgr.start(dhtCtx);

            dhtCtx.initConflictResolver();

            // Start DHT cache.
            dhtCtx.cache().start();

            if (log.isDebugEnabled())
                log.debug("Started DHT cache: " + dhtCtx.cache().name());
        }

        ctx.continuous().onCacheStart(cacheCtx);

        cacheCtx.cache().start();
    }

    /**
     * Handle of cache context which was fully prepared.
     *
     * @param cacheCtx Fully prepared context.
     * @throws IgniteCheckedException if failed.
     */
    private void onCacheStarted(GridCacheContext cacheCtx) throws IgniteCheckedException {
        GridCacheAdapter cache = cacheCtx.cache();
        CacheConfiguration cfg = cacheCtx.config();
        CacheGroupContext grp = cacheGrps.get(cacheCtx.groupId());

        cacheCtx.onStarted();

        String dataRegion = cfg.getDataRegionName();

        if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
            dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();

        if (log.isInfoEnabled()) {
            String expPlcInfo = buildExpirePolicyInfo(cacheCtx);

            log.info("Started cache [name=" + cfg.getName() +
                ", id=" + cacheCtx.cacheId() +
                (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
                ", dataRegionName=" + dataRegion +
                ", mode=" + cfg.getCacheMode() +
                ", atomicity=" + cfg.getAtomicityMode() +
                ", backups=" + cfg.getBackups() +
                (expPlcInfo != null ? ", " + expPlcInfo : "") + ']');
        }

        grp.onCacheStarted(cacheCtx);

        onKernalStart(cache);

        if (ctx.performanceStatistics().enabled() && U.isLocalNodeCoordinator(ctx.discovery()))
            ctx.performanceStatistics().cacheStart(cacheCtx.cacheId(), cfg.getName());
    }

    /**
     * @param desc Cache descriptor.
     * @throws IgniteCheckedException If failed.
     */
    private GridCacheContext<?, ?> startCacheInRecoveryMode(
        DynamicCacheDescriptor desc
    ) throws IgniteCheckedException {
        // Only affinity nodes are able to start cache in recovery mode.
        desc = enricher().enrich(desc, true);

        CacheConfiguration cfg = desc.cacheConfiguration();

        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);

        preparePageStore(desc, true);

        CacheGroupContext grpCtx;
        GridCacheContext cacheCtx;

        ctx.cache().context().database().checkpointReadLock();

        try {
            grpCtx = getOrCreateCacheGroupContext(
                desc,
                AffinityTopologyVersion.NONE,
                cacheObjCtx,
                true,
                cfg.getGroupName(),
                true
            );

            cacheCtx = createCacheContext(cfg,
                grpCtx,
                null,
                desc,
                AffinityTopologyVersion.NONE,
                cacheObjCtx,
                true,
                true,
                false,
                true
            );

            initCacheContext(cacheCtx, cfg);
        }
        finally {
            ctx.cache().context().database().checkpointReadUnlock();
        }

        cacheCtx.onStarted();

        String dataRegion = cfg.getDataRegionName();

        if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
            dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();

        grpCtx.onCacheStarted(cacheCtx);

        ctx.query().onCacheStart(new GridCacheContextInfo(cacheCtx, false),
            desc.schema() != null ? desc.schema() : new QuerySchema(), desc.sql());

        if (log.isInfoEnabled()) {
            String expPlcInfo = buildExpirePolicyInfo(cacheCtx);

            log.info("Started cache in recovery mode [name=" + cfg.getName() +
                ", id=" + cacheCtx.cacheId() +
                (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
                ", dataRegionName=" + dataRegion +
                ", mode=" + cfg.getCacheMode() +
                ", atomicity=" + cfg.getAtomicityMode() +
                ", backups=" + cfg.getBackups() +
                (expPlcInfo != null ? ", " + expPlcInfo : "") + ']');
        }

        return cacheCtx;
    }

    /**
     * Build formatted string with expire policy info.
     *
     * @param cacheCtx - cache context.
     * @return formatted expire policy info.
     */
    private String buildExpirePolicyInfo(@NotNull GridCacheContext cacheCtx) {
        ExpiryPolicy expPlc = cacheCtx.expiry();
        if (expPlc == null || expPlc instanceof EternalExpiryPolicy)
            return null;

        return String.format(EXPRITY_POLICY_MSG, expPlc.getClass().getName(), cacheCtx.ttl().eagerTtlEnabled());
    }

    /**
     * @param grpName Group name.
     * @return Found group or null.
     */
    private CacheGroupContext findCacheGroup(String grpName) {
        return cacheGrps.values().stream()
            .filter(grp -> grp.sharedGroup() && grpName.equals(grp.name()))
            .findAny()
            .orElse(null);
    }

    /**
     * Restarts proxies of caches if they was marked as restarting. Requires external synchronization - shouldn't be
     * called concurrently with another caches restart.
     */
    public void restartProxies() {
        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
            if (proxy == null)
                continue;

            GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName()));

            if (cacheCtx == null)
                continue;

            if (proxy.isRestarting()) {
                caches.get(proxy.getName()).active(true);

                proxy.onRestarted(cacheCtx, cacheCtx.cache());

                if (cacheCtx.dataStructuresCache())
                    ctx.dataStructures().restart(proxy.getName(), proxy.internalProxy());
            }
        }
    }

    /**
     * Complete stopping of caches if they were marked as restarting but it failed.
     * @return Cache names of proxies which were restarted.
     */
    public List<String> resetRestartingProxies() {
        List<String> res = new ArrayList<>();

        for (Map.Entry<String, IgniteCacheProxyImpl<?, ?>> e : jCacheProxies.entrySet()) {
            IgniteCacheProxyImpl<?, ?> proxy = e.getValue();

            if (proxy == null)
                continue;

            if (proxy.isRestarting()) {
                String cacheName = e.getKey();

                res.add(cacheName);

                jCacheProxies.remove(cacheName);

                proxy.onRestarted(null, null);

                if (DataStructuresProcessor.isDataStructureCache(cacheName))
                    ctx.dataStructures().restart(cacheName, null);
            }
        }

        cachesInfo.removeRestartingCaches();

        return res;
    }

    /**
     * @param desc Group descriptor.
     * @param cacheType Cache type.
     * @param affNode Affinity node flag.
     * @param cacheObjCtx Cache object context.
     * @param exchTopVer Current topology version.
     * @return Started cache group.
     * @throws IgniteCheckedException If failed.
     */
    private CacheGroupContext startCacheGroup(
        CacheGroupDescriptor desc,
        CacheType cacheType,
        boolean affNode,
        CacheObjectContext cacheObjCtx,
        AffinityTopologyVersion exchTopVer,
        boolean recoveryMode
    ) throws IgniteCheckedException {
        desc = enricher().enrich(desc, affNode);

        CacheConfiguration cfg = new CacheConfiguration(desc.config());

        String memPlcName = cfg.getDataRegionName();

        DataRegion dataRegion = affNode ? sharedCtx.database().dataRegion(memPlcName) : null;

        boolean needToStart = (dataRegion != null)
            && (cacheType != CacheType.USER
                || (sharedCtx.isLazyMemoryAllocation(dataRegion)
                    && !cacheObjCtx.kernalContext().clientNode()));

        if (needToStart)
            dataRegion.pageMemory().start();

        FreeList freeList = sharedCtx.database().freeList(memPlcName);
        ReuseList reuseList = sharedCtx.database().reuseList(memPlcName);

        boolean persistenceEnabled = recoveryMode || sharedCtx.localNode().isClient() ? desc.persistenceEnabled() :
            dataRegion != null && dataRegion.config().isPersistenceEnabled();

        CompressionHandler compressHnd = CompressionHandler.create(ctx, cfg);

        if (log.isInfoEnabled() && compressHnd.compressionEnabled()) {
            log.info("Disk page compression is enabled [cacheGrp=" + CU.cacheOrGroupName(cfg) +
                ", compression=" + compressHnd.diskPageCompression() + ", level=" +
                compressHnd.diskPageCompressionLevel() + "]");
        }

        CacheGroupContext grp = new CacheGroupContext(sharedCtx,
            desc.groupId(),
            desc.receivedFrom(),
            cacheType,
            cfg,
            affNode,
            dataRegion,
            cacheObjCtx,
            freeList,
            reuseList,
            exchTopVer,
            persistenceEnabled,
            desc.walEnabled(),
            recoveryMode,
            compressHnd
        );

        for (Object obj : grp.configuredUserObjects())
            prepare(cfg, obj, false);

        U.startLifecycleAware(grp.configuredUserObjects());

        grp.start();

        CacheGroupContext old = cacheGrps.put(desc.groupId(), grp);

        assert old == null : old.name();

        return grp;
    }

    /**
     * @param cacheName Cache name.
     * @param stop {@code True} for stop cache, {@code false} for close cache.
     * @param restart Restart flag.
     */
    public void blockGateway(String cacheName, boolean stop, boolean restart) {
        // Break the proxy before exchange future is done.
        IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, false);

        if (restart) {
            GridCacheAdapter<?, ?> cache = caches.get(cacheName);

            if (cache != null)
                cache.active(false);
        }

        if (stop) {
            if (restart) {
                GridCacheAdapter<?, ?> cache;

                if (proxy == null && (cache = caches.get(cacheName)) != null) {
                    proxy = new IgniteCacheProxyImpl(cache.context(), cache, false);

                    IgniteCacheProxyImpl<?, ?> oldProxy = jCacheProxies.putIfAbsent(cacheName, proxy);

                    if (oldProxy != null)
                        proxy = oldProxy;
                }

                if (proxy != null)
                    proxy.suspend();
            }

            if (proxy != null)
                proxy.context0().gate().stopped();
        }
        else if (proxy != null)
            proxy.closeProxy();
    }

    /**
     * @param req Request.
     */
    private void stopGateway(DynamicCacheChangeRequest req) {
        assert req.stop() : req;

        IgniteCacheProxyImpl<?, ?> proxy;

        // Break the proxy before exchange future is done.
        if (req.restart()) {
            if (DataStructuresProcessor.isDataStructureCache(req.cacheName()))
                ctx.dataStructures().suspend(req.cacheName());

            GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());

            if (cache != null)
                cache.active(false);

            proxy = jCacheProxies.get(req.cacheName());

            if (proxy != null)
                proxy.suspend();
        }
        else {
            completeProxyInitialize(req.cacheName());

            proxy = jCacheProxies.remove(req.cacheName());
        }

        if (proxy != null)
            proxy.context0().gate().onStopped();
    }

    /**
     * @param cacheName Cache name.
     * @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
     * @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
     */
    public void prepareCacheStop(String cacheName, boolean callDestroy, boolean clearCache) {
        prepareCacheStop(cacheName, callDestroy, clearCache, true);
    }

    /**
     * @param cacheName Cache name.
     * @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
     * @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
     * @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
     */
    public void prepareCacheStop(String cacheName, boolean callDestroy, boolean clearCache, boolean clearDbObjects) {
        assert sharedCtx.database().checkpointLockIsHeldByThread();

        GridCacheAdapter<?, ?> cache = caches.remove(cacheName);

        if (cache != null) {
            GridCacheContext<?, ?> ctx = cache.context();

            sharedCtx.removeCacheContext(ctx);

            onKernalStop(cache, true);

            stopCache(cache, true, callDestroy, clearCache, clearDbObjects);
        }
        else {
            // Try to unregister query structures for not started caches.
            ctx.query().onCacheStop(cacheName);

            // Cache adapter may not exist due to the node filter.
            DynamicCacheDescriptor cacheToDelete = callDestroy ? cachesInfo.markedForDeletionCache(cacheName) : null;

            if (cacheToDelete != null)
                removeCacheConfig(cacheToDelete.cacheConfiguration());
        }
    }

    /**
     * @param startTopVer Cache start version.
     * @param err Cache start error if any.
     */
    void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) {
        for (GridCacheAdapter<?, ?> cache : caches.values()) {
            GridCacheContext<?, ?> cacheCtx = cache.context();

            if (cacheCtx.startTopologyVersion().equals(startTopVer)) {
                if (!jCacheProxies.containsKey(cacheCtx.name())) {
                    IgniteCacheProxyImpl<?, ?> newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false);

                    if (!cache.active())
                        newProxy.suspend();

                    addjCacheProxy(cacheCtx.name(), newProxy);
                }

                if (cacheCtx.preloader() != null)
                    cacheCtx.preloader().onInitialExchangeComplete(err);
            }
        }
    }

    /**
     * @param cachesToClose Caches to close.
     * @param retClientCaches {@code True} if return IDs of closed client caches.
     * @return Closed client caches' IDs.
     */
    Set<Integer> closeCaches(Set<String> cachesToClose, boolean retClientCaches) {
        Set<Integer> ids = null;

        for (String cacheName : cachesToClose) {
            completeProxyInitialize(cacheName);

            blockGateway(cacheName, false, false);

            GridCacheContext ctx = sharedCtx.cacheContext(CU.cacheId(cacheName));

            if (ctx == null)
                continue;

            if (retClientCaches && !ctx.affinityNode()) {
                if (ids == null)
                    ids = U.newHashSet(cachesToClose.size());

                ids.add(ctx.cacheId());
            }

            closeCache(ctx);
        }

        return ids;
    }

    /**
     * @param cctx Cache context.
     */
    private void closeCache(GridCacheContext cctx) {
        if (cctx.affinityNode()) {
            GridCacheAdapter<?, ?> cache = caches.get(cctx.name());

            assert cache != null : cctx.name();

            jCacheProxies.put(cctx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false));

            completeProxyInitialize(cctx.name());
        }
        else {
            cctx.gate().onStopped();

            // Do not close client cache while requests processing is in progress.
            sharedCtx.io().writeLock();

            try {
                if (!cctx.affinityNode() && cctx.transactional())
                    sharedCtx.tm().rollbackTransactionsForCache(cctx.cacheId());

                completeProxyInitialize(cctx.name());

                jCacheProxies.remove(cctx.name());

                closeCacheOnNotAffinityNode(cctx);
            }
            finally {
                sharedCtx.io().writeUnlock();
            }
        }
    }

    /**
     * @param cctx Cache context to close.
     */
    private void closeCacheOnNotAffinityNode(GridCacheContext cctx) {
        if (ctx.query().moduleEnabled())
            stopCacheSafely(cctx, false);
        else
            stopCacheSafely(cctx);
    }

    /**
     * Called during the rollback of the exchange partitions procedure in order to stop the given cache even if it's not
     * fully initialized (e.g. failed on cache init stage).
     *
     * @param topVer Topology version related to the given {@code exchActions}.
     * @param exchActions Stop requests.
     */
    void forceCloseCaches(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
        assert exchActions != null && !exchActions.cacheStopRequests().isEmpty();

        processCacheStopRequestOnExchangeDone(topVer, exchActions);
    }

    /**
     * @param topVer Topology version related to the given {@code exchActions}.
     * @param exchActions Change requests.
     */
    private void processCacheStopRequestOnExchangeDone(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
        // Reserve at least 2 threads for system operations.
        int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);

        List<IgniteBiTuple<CacheGroupContext, Boolean>> grpsToStop = exchActions.cacheGroupsToStop().stream()
            .filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
            .map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
            .collect(toList());

        // Wait until all evictions are finished.
        grpsToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));

        Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
            .collect(groupingBy(action -> action.descriptor().groupId()));

        Set<Integer> grpIdToDestroy = grpsToStop.stream()
            .filter(IgniteBiTuple::get2).map(t2 -> t2.get1().groupId()).collect(toSet());

        try {
            doInParallel(
                parallelismLvl,
                sharedCtx.kernalContext().pools().getSystemExecutorService(),
                cachesToStop.entrySet(),
                cachesToStopByGrp -> {
                    Integer grpId = cachesToStopByGrp.getKey();

                    CacheGroupContext gctx = cacheGrps.get(grpId);

                    if (gctx != null) {
                        final String msg = "Failed to wait for topology update, cache group is stopping.";

                        // If snapshot operation in progress we must throw CacheStoppedException
                        // for correct cache proxy restart. For more details see
                        // IgniteCacheProxy.cacheException()
                        gctx.affinity().cancelFutures(new CacheStoppedException(msg));
                    }

                    for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) {
                        // Rollback tx started before gateway blocked to avoid deadlock with gateway stop.
                        context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());

                        stopGateway(action.request());

                        // Rollback tx started after gateway blocked but not stopped.
                        context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());

                        String cacheName = action.request().cacheName();

                        GridCacheAdapter<?, ?> cache = caches.get(cacheName);

                        if (cache != null)
                            cache.context().ttl().unregister();
                    }

                    return null;
                }
            );

            grpsToStop.forEach(g -> g.get1().prepareToStop());

            if (!exchActions.cacheStopRequests().isEmpty())
                removeOffheapListenerAfterCheckpoint(grpsToStop);

            doInParallel(
                parallelismLvl,
                sharedCtx.kernalContext().pools().getSystemExecutorService(),
                cachesToStop.entrySet(),
                cachesToStopByGrp -> {
                    Integer grpId = cachesToStopByGrp.getKey();

                    CacheGroupContext gctx = cacheGrps.get(grpId);

                    if (gctx != null)
                        gctx.preloader().pause();

                    try {
                        for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) {
                            String cacheName = action.request().cacheName();

                            sharedCtx.database().checkpointReadLock();

                            try {
                                boolean callDestroy = action.request().destroy();

                                // If all caches in group will be destroyed it is not necessary to clear a single cache
                                // because group will be stopped anyway.
                                // Stop will still be called with destroy {@code true}, but cache will not be cleared.
                                boolean clearCache = callDestroy && !grpIdToDestroy.contains(grpId);

                                prepareCacheStop(cacheName, callDestroy, clearCache);

                                if (callDestroy || grpIdToDestroy.contains(grpId))
                                    ctx.query().completeRebuildIndexes(cacheName);
                            }
                            finally {
                                sharedCtx.database().checkpointReadUnlock();
                            }
                        }
                    }
                    finally {
                        if (gctx != null)
                            gctx.preloader().resume();
                    }

                    return null;
                }
            );
        }
        catch (IgniteCheckedException e) {
            String msg = "Failed to stop caches";

            log.error(msg, e);

            throw new IgniteException(msg, e);
        }
        finally {
            cachesInfo.cleanupRemovedCaches(topVer);
        }

        for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpsToStop)
            stopCacheGroup(grp.get1().groupId(), grp.get2());

        if (!sharedCtx.kernalContext().clientNode())
            sharedCtx.database().onCacheGroupsStopped(grpsToStop);

        cachesInfo.cleanupRemovedCacheGroups(topVer);

        if (exchActions.deactivate())
            sharedCtx.deactivate();
    }

    /**
     * @param rmtNode Remote node to check.
     * @return Data storage configuration
     */
    private DataStorageConfiguration extractDataStorage(ClusterNode rmtNode) {
        return GridCacheUtils.extractDataStorage(
            rmtNode,
            ctx.marshallerContext().jdkMarshaller(),
            U.resolveClassLoader(ctx.config())
        );
    }

    /**
     * @param dataStorageCfg User-defined data regions.
     */
    private Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
        if (dataStorageCfg != null) {
            return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
                .map(Stream::of)
                .orElseGet(Stream::empty)
                .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
        }

        return Collections.emptyMap();
    }

    /**
     * Force checkpoint and remove offheap checkpoint listener after it was finished.
     *
     * @param grpToStop Cache group to stop.
     */
    private void removeOffheapListenerAfterCheckpoint(List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop) {
        try {
            sharedCtx.database().waitForCheckpoint(
                "caches stop", (fut) -> removeOffheapCheckpointListener(grpToStop)
            );
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
        }
    }

    /**
     * @param grpToStop Group for which listener shuold be removed.
     */
    private void removeOffheapCheckpointListener(List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop) {
        sharedCtx.database().checkpointReadLock();
        try {
            // Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption.
            grpToStop.forEach(grp -> {
                CacheGroupContext gctx = grp.getKey();

                if (gctx != null && gctx.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
                    GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager)sharedCtx.database();
                    mngr.removeCheckpointListener((CheckpointListener)gctx.offheap());
                }
            });
        }
        finally {
            sharedCtx.database().checkpointReadUnlock();
        }
    }

    /**
     * Callback invoked when first exchange future for dynamic cache is completed.
     *
     * @param cacheStartVer Started caches version to create proxy for.
     * @param exchActions Change requests.
     * @param err Error.
     */
    public void onExchangeDone(
        AffinityTopologyVersion cacheStartVer,
        @Nullable ExchangeActions exchActions,
        @Nullable Throwable err
    ) {
        initCacheProxies(cacheStartVer, err);

        if (exchActions == null)
            return;

        if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null)
            ctx.dataStructures().restoreStructuresState(ctx);

        if (err == null)
            processCacheStopRequestOnExchangeDone(cacheStartVer, exchActions);
    }

    /**
     * @param grpId Group ID.
     * @param destroy Group destroy flag.
     */
    private void stopCacheGroup(int grpId, boolean destroy) {
        CacheGroupContext grp = cacheGrps.remove(grpId);

        if (grp != null)
            stopCacheGroup(grp, destroy);
    }

    /**
     * @param grp Cache group.
     * @param destroy Group destroy flag.
     */
    private void stopCacheGroup(CacheGroupContext grp, boolean destroy) {
        grp.stopGroup();

        U.stopLifecycleAware(log, grp.configuredUserObjects());

        cleanup(grp, destroy);
    }

    /**
     * @param cacheName Cache name.
     * @param deploymentId Future deployment ID.
     */
    void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
        GridCacheProcessor.TemplateConfigurationFuture fut =
            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName);

        if (fut != null && fut.deploymentId().equals(deploymentId))
            fut.onDone();
    }

    /**
     * @param req Request to complete future for.
     * @param success Future result.
     * @param err Error if any.
     */
    public void completeCacheStartFuture(DynamicCacheChangeRequest req, boolean success, @Nullable Throwable err) {
        if (ctx.localNodeId().equals(req.initiatingNodeId())) {
            DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());

            if (fut != null)
                fut.onDone(success, err);
        }
    }

    /**
     * @param reqId Request ID.
     * @param err Error if any.
     */
    void completeClientCacheChangeFuture(UUID reqId, @Nullable Exception err) {
        DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId);

        if (fut != null)
            fut.onDone(false, err);
    }

    /**
     * Creates shared context.
     *
     * @param kernalCtx Kernal context.
     * @param storeSesLsnrs Store session listeners.
     * @return Shared context.
     * @throws IgniteCheckedException If failed.
     */
    @SuppressWarnings("unchecked")
    private GridCacheSharedContext createSharedContext(
        GridKernalContext kernalCtx,
        Collection<CacheStoreSessionListener> storeSesLsnrs
    ) throws IgniteCheckedException {
        IgniteCacheDatabaseSharedManager dbMgr;
        IgnitePageStoreManager pageStoreMgr = null;
        IgniteWriteAheadLogManager walMgr = null;
        CdcManager cdcMgr = null;

        if (CU.isPersistenceEnabled(ctx.config()) && !ctx.clientNode()) {
            dbMgr = new GridCacheDatabaseSharedManager(ctx);

            pageStoreMgr = ctx.plugins().createComponent(IgnitePageStoreManager.class);

            if (pageStoreMgr == null)
                pageStoreMgr = new FilePageStoreManager(ctx);
        }
        else {
            if (CU.isPersistenceEnabled(ctx.config()) && ctx.clientNode()) {
                U.warn(log, "Persistent Store is not supported on client nodes (Persistent Store's" +
                    " configuration will be ignored).");
            }

            dbMgr = new IgniteCacheDatabaseSharedManager(ctx);
        }

        if ((CU.isPersistenceEnabled(ctx.config()) || CU.isCdcEnabled(ctx.config())) && !ctx.clientNode()) {
            walMgr = ctx.plugins().createComponent(IgniteWriteAheadLogManager.class);

            if (walMgr == null)
                walMgr = new FileWriteAheadLogManager(ctx);
        }

        if (CU.isCdcEnabled(ctx.config()) && !ctx.clientNode()) {
            cdcMgr = ctx.plugins().createComponent(CdcManager.class);

            if (cdcMgr != null) {
                if (ctx.config().getDataStorageConfiguration().getWalMode() != WALMode.LOG_ONLY) {
                    U.warn(log, "Custom CdcManager is only supported for WALMode.LOG_ONLY. CdcManager configuration will be ignored.");

                    cdcMgr = null;
                }

                if (!IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true)) {
                    U.warn(log, "Custom CdcManager is only supported for IGNITE_WAL_MMAP=true. CdcManager configuration will be ignored.");

                    cdcMgr = null;
                }
            }

            if (cdcMgr == null)
                cdcMgr = new CdcUtilityActiveCdcManager();
        }

        IgniteSnapshotManager snapshotMgr = ctx.plugins().createComponent(IgniteSnapshotManager.class);

        if (snapshotMgr == null)
            snapshotMgr = new IgniteSnapshotManager(ctx);

        return GridCacheSharedContext.builder()
            .setTxManager(new IgniteTxManager())
            .setMvccManager(new GridCacheMvccManager())
            .setVersionManager(new GridCacheVersionManager())
            .setDeploymentManager(new GridCacheDeploymentManager())
            .setPartitionExchangeManager(new GridCachePartitionExchangeManager())
            .setDatabaseManager(dbMgr)
            .setPageStoreManager(pageStoreMgr)
            .setWalManager(walMgr)
            .setWalStateManager(new WalStateManager(ctx))
            .setSnapshotManager(snapshotMgr)
            .setIoManager(new GridCacheIoManager())
            .setAffinityManager(new CacheAffinitySharedManager())
            .setTtlCleanupManager(new GridCacheSharedTtlCleanupManager())
            .setPartitionsEvictManager(new PartitionsEvictManager())
            .setJtaManager(JTA.createOptional())
            .setDiagnosticManager(new CacheDiagnosticManager())
            .setCdcManager(cdcMgr)
            .build(kernalCtx, storeSesLsnrs);
    }

    /** {@inheritDoc} */
    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
        return CACHE_PROC;
    }

    /** {@inheritDoc} */
    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
        cachesInfo.collectJoiningNodeData(dataBag);
    }

    /** {@inheritDoc} */
    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
        cachesInfo.collectGridNodeData(dataBag, backwardCompatibleSplitter());
    }

    /** {@inheritDoc} */
    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
        cachesInfo.onJoiningNodeDataReceived(data);
    }

    /** {@inheritDoc} */
    @Override public void onGridDataReceived(GridDiscoveryData data) {
        cachesInfo.onGridDataReceived(data);

        sharedCtx.walState().onCachesInfoCollected();
    }

    /** {@inheritDoc} */
    @Override public @Nullable IgniteNodeValidationResult validateNode(
        ClusterNode node, JoiningNodeDiscoveryData discoData
    ) {
        if (!cachesInfo.isMergeConfigSupports(node))
            return null;

        String validationRes = cachesInfo.validateJoiningNodeData(discoData, node.isClient());

        if (validationRes != null)
            return new IgniteNodeValidationResult(node.id(), validationRes, validationRes);

        return ValidationOnNodeJoinUtils.validateNode(node, discoData, marsh, ctx, this::cacheDescriptor);
    }

    /**
     * @param msg Message.
     */
    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
        cachesInfo.onStateChangeFinish(msg);
    }

    /**
     * @param msg Message.
     * @param topVer Current topology version.
     * @param curState Current cluster state.
     * @return Exchange actions.
     * @throws IgniteCheckedException If configuration validation failed.
     */
    public ExchangeActions onStateChangeRequest(
        ChangeGlobalStateMessage msg,
        AffinityTopologyVersion topVer,
        DiscoveryDataClusterState curState
    ) throws IgniteCheckedException {
        return cachesInfo.onStateChangeRequest(msg, topVer, curState);
    }

    /**
     * Cache statistics flag change message received.
     *
     * @param msg Message.
     */
    public void onCacheStatisticsModeChange(CacheStatisticsModeChangeMessage msg) {
        assert msg != null;

        if (msg.initial()) {
            EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());

            if (fut != null && !cacheNames().containsAll(msg.caches())) {
                fut.onDone(new IgniteCheckedException("One or more cache descriptors not found [caches="
                    + caches + ']'));

                return;
            }

            for (String cacheName : msg.caches()) {
                DynamicCacheDescriptor desc = cachesInfo.registeredCaches().get(cacheName);

                if (desc != null) {
                    if (desc.cacheConfiguration().isStatisticsEnabled() != msg.enabled()) {
                        desc.cacheConfiguration().setStatisticsEnabled(msg.enabled());

                        try {
                            ctx.cache().saveCacheConfiguration(desc);
                        }
                        catch (IgniteCheckedException e) {
                            log.error("Error while saving cache configuration to disk, cfg = "
                                + desc.cacheConfiguration(), e);
                        }
                    }
                }
                else
                    log.warning("Failed to change cache descriptor configuration, cache not found [cacheName="
                        + cacheName + ']');
            }
        }
        else {
            EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());

            if (fut != null)
                fut.onDone();
        }
    }

    /**
     * Cache statistics clear message received.
     *
     * @param msg Message.
     */
    private void onCacheStatisticsClear(CacheStatisticsClearMessage msg) {
        assert msg != null;

        if (msg.initial()) {
            EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());

            if (fut != null && !cacheNames().containsAll(msg.caches())) {
                fut.onDone(new IgniteCheckedException("One or more cache descriptors not found [caches="
                    + caches + ']'));

                return;
            }

            for (String cacheName : msg.caches()) {
                GridCacheAdapter<?, ?> cache = ctx.cache().internalCache(cacheName);

                if (cache != null)
                    cache.metrics0().clear();
                else
                    log.warning("Failed to clear cache statistics, cache not found [cacheName="
                        + cacheName + ']');
            }
        }
        else {
            EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());

            if (fut != null)
                fut.onDone();
        }
    }

    /**
     * Cache statistics flag change task processed by exchange worker.
     *
     * @param msg Message.
     */
    public void processStatisticsModeChange(CacheStatisticsModeChangeMessage msg) {
        assert msg != null;

        for (String cacheName : msg.caches()) {
            IgniteInternalCache<Object, Object> cache = cache(cacheName);

            if (cache != null)
                cache.context().statisticsEnabled(msg.enabled());
            else
                log.warning("Failed to change cache configuration, cache not found [cacheName=" + cacheName + ']');
        }
    }

    /**
     * @param stoppedCaches Stopped caches.
     */
    private void stopCachesOnClientReconnect(Collection<GridCacheAdapter> stoppedCaches) {
        assert ctx.discovery().localNode().isClient();

        for (GridCacheAdapter cache : stoppedCaches) {
            CacheGroupContext grp = cache.context().group();

            onKernalStop(cache, true);
            stopCache(cache, true, false, false);

            sharedCtx.affinity().stopCacheOnReconnect(cache.context());

            if (!grp.hasCaches()) {
                stopCacheGroup(grp, false);

                sharedCtx.affinity().stopCacheGroupOnReconnect(grp);
            }
        }
    }

    /**
     * Dynamically starts cache using template configuration.
     *
     * @param cacheName Cache name.
     * @return Future that will be completed when cache is deployed.
     */
    public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
        try {
            CacheConfiguration cfg = getOrCreateConfigFromTemplate(cacheName);

            return dynamicStartCache(cfg, cacheName, null, true, true, true);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /**
     * Dynamically starts cache using template configuration.
     *
     * @param cacheName Cache name.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @return Future that will be completed when cache is deployed.
     */
    public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean checkThreadTx) {
        return getOrCreateFromTemplate(cacheName, cacheName, null, checkThreadTx);
    }

    /**
     * Dynamically starts cache using template configuration.
     *
     * @param cacheName Cache name.
     * @param templateName Cache template name.
     * @param cfgOverride Cache config properties to override.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @return Future that will be completed when cache is deployed.
     */
    public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, String templateName,
        CacheConfigurationOverride cfgOverride, boolean checkThreadTx) {
        assert cacheName != null;

        try {
            if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started.
                return new GridFinishedFuture<>();

            CacheConfiguration ccfg = F.isEmpty(templateName)
                ? getOrCreateConfigFromTemplate(cacheName)
                : getOrCreateConfigFromTemplate(templateName);

            ccfg.setName(cacheName);

            if (cfgOverride != null)
                cfgOverride.apply(ccfg);

            return dynamicStartCache(ccfg, cacheName, null, false, true, checkThreadTx);
        }
        catch (IgniteCheckedException e) {
            return new GridFinishedFuture<>(e);
        }
    }

    /**
     * @param cacheName Cache name.
     * @return Cache configuration, or {@code null} if no template with matching name found.
     * @throws IgniteCheckedException If failed.
     */
    public CacheConfiguration getConfigFromTemplate(String cacheName) throws IgniteCheckedException {
        DynamicCacheDescriptor cfgTemplate = null;

        DynamicCacheDescriptor dfltCacheCfg = null;

        List<DynamicCacheDescriptor> wildcardNameCfgs = null;

        for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
            assert desc.template();

            CacheConfiguration cfg = desc.cacheConfiguration();

            assert cfg != null;

            if (F.eq(cacheName, cfg.getName())) {
                cfgTemplate = desc;

                break;
            }

            if (cfg.getName() != null) {
                if (GridCacheUtils.isCacheTemplateName(cfg.getName())) {
                    if (cfg.getName().length() > 1) {
                        if (wildcardNameCfgs == null)
                            wildcardNameCfgs = new ArrayList<>();

                        wildcardNameCfgs.add(desc);
                    }
                    else
                        dfltCacheCfg = desc; // Template with name '*'.
                }
            }
            else if (dfltCacheCfg == null)
                dfltCacheCfg = desc;
        }

        if (cfgTemplate == null && cacheName != null && wildcardNameCfgs != null) {
            wildcardNameCfgs.sort((a, b) ->
                Integer.compare(b.cacheConfiguration().getName().length(), a.cacheConfiguration().getName().length()));

            for (DynamicCacheDescriptor desc : wildcardNameCfgs) {
                String wildcardCacheName = desc.cacheConfiguration().getName();

                if (cacheName.startsWith(wildcardCacheName.substring(0, wildcardCacheName.length() - 1))) {
                    cfgTemplate = desc;

                    break;
                }
            }
        }

        if (cfgTemplate == null)
            cfgTemplate = dfltCacheCfg;

        if (cfgTemplate == null)
            return null;

        // It's safe to enrich cache configuration here because we requested this cache from current node.
        CacheConfiguration enrichedTemplate = enricher().enrichFully(
            cfgTemplate.cacheConfiguration(), cfgTemplate.cacheConfigurationEnrichment());

        enrichedTemplate = cloneCheckSerializable(enrichedTemplate);

        CacheConfiguration cfg = new CacheConfiguration(enrichedTemplate);

        cfg.setName(cacheName);

        return cfg;
    }

    /**
     * @param cacheName Cache name.
     * @return Cache configuration.
     * @throws IgniteCheckedException If failed.
     */
    private CacheConfiguration getOrCreateConfigFromTemplate(String cacheName) throws IgniteCheckedException {
        CacheConfiguration cfg = getConfigFromTemplate(cacheName);

        return cfg != null ? cfg : new CacheConfiguration(cacheName);
    }

    /**
     * Dynamically starts cache.
     *
     * @param ccfg Cache configuration.
     * @param cacheName Cache name.
     * @param nearCfg Near cache configuration.
     * @param failIfExists Fail if exists flag.
     * @param failIfNotStarted If {@code true} fails if cache is not started.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @return Future that will be completed when cache is deployed.
     */
    public IgniteInternalFuture<Boolean> dynamicStartCache(
        @Nullable CacheConfiguration ccfg,
        String cacheName,
        @Nullable NearCacheConfiguration nearCfg,
        boolean failIfExists,
        boolean failIfNotStarted,
        boolean checkThreadTx
    ) {
        return dynamicStartCache(ccfg,
            cacheName,
            nearCfg,
            CacheType.USER,
            false,
            failIfExists,
            failIfNotStarted,
            checkThreadTx);
    }

    /**
     * Dynamically starts cache as a result of SQL {@code CREATE TABLE} command.
     *
     * @param ccfg Cache configuration.
     */
    public IgniteInternalFuture<Boolean> dynamicStartSqlCache(
        CacheConfiguration ccfg
    ) {
        A.notNull(ccfg, "ccfg");

        return dynamicStartCache(ccfg,
            ccfg.getName(),
            ccfg.getNearConfiguration(),
            CacheType.USER,
            true,
            false,
            true,
            true);
    }

    /**
     * Dynamically starts cache.
     *
     * @param ccfg Cache configuration.
     * @param cacheName Cache name.
     * @param nearCfg Near cache configuration.
     * @param cacheType Cache type.
     * @param sql If the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
     * @param failIfExists Fail if exists flag.
     * @param failIfNotStarted If {@code true} fails if cache is not started.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @return Future that will be completed when cache is deployed.
     */
    public IgniteInternalFuture<Boolean> dynamicStartCache(
        @Nullable CacheConfiguration ccfg,
        String cacheName,
        @Nullable NearCacheConfiguration nearCfg,
        CacheType cacheType,
        boolean sql,
        boolean failIfExists,
        boolean failIfNotStarted,
        boolean checkThreadTx
    ) {
        assert cacheName != null;

        if (checkThreadTx) {
            sharedCtx.tm().checkEmptyTransactions(
                () -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheName, "dynamicStartCache"));
        }

        GridPlainClosure2<Collection<byte[]>, byte[], IgniteInternalFuture<Boolean>> startCacheClsr =
            (grpKeys, masterKeyDigest) -> {
                assert ccfg == null || !ccfg.isEncryptionEnabled() || !grpKeys.isEmpty();

                DynamicCacheChangeRequest req = prepareCacheChangeRequest(
                    ccfg,
                    cacheName,
                    nearCfg,
                    cacheType,
                    sql,
                    failIfExists,
                    failIfNotStarted,
                    null,
                    false,
                    null,
                    ccfg != null && ccfg.isEncryptionEnabled() ? grpKeys.iterator().next() : null,
                    null,
                    ccfg != null && ccfg.isEncryptionEnabled() ? masterKeyDigest : null);

                if (req != null) {
                    if (req.clientStartOnly())
                        return startClientCacheChange(F.asMap(req.cacheName(), req), null);

                    return F.first(initiateCacheChanges(F.asList(req)));
                }
                else
                    return new GridFinishedFuture<>();
            };

        try {
            if (ccfg != null && ccfg.isEncryptionEnabled()) {
                ctx.encryption().checkEncryptedCacheSupported();

                return generateEncryptionKeysAndStartCacheAfter(1, startCacheClsr);
            }

            return startCacheClsr.apply(Collections.EMPTY_SET, null);
        }
        catch (Exception e) {
            return new GridFinishedFuture<>(e);
        }
    }

    /**
     * Checks that cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
     *
     * @param opName Operation name.
     * @param cfgs Stored cache configurations.
     * @throws CacheException If cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
     */
    private void checkReadOnlyState(String opName, Collection<StoredCacheData> cfgs) {
        IgniteOutClosure<String> cacheNameClo = null;
        IgniteOutClosure<String> cacheGrpNameClo = null;

        if (!F.isEmpty(cfgs)) {
            if (cfgs.size() == 1) {
                CacheConfiguration cfg = cfgs.iterator().next().config();

                cacheNameClo = cfg::getName;
                cacheGrpNameClo = cfg::getGroupName;
            }
            else {
                cacheNameClo = () -> cfgs.stream()
                    .map(StoredCacheData::config)
                    .map(CacheConfiguration::getName)
                    .collect(toList()).toString();

                cacheGrpNameClo = () -> cfgs.stream()
                    .map(StoredCacheData::config)
                    .map(CacheConfiguration::getGroupName)
                    .collect(toList()).toString();
            }
        }

        checkReadOnlyState(opName, cacheGrpNameClo, cacheNameClo);
    }

    /**
     * Checks that cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
     *
     * @param opName Operation name.
     * @param cfgs Cache configurations.
     * @throws CacheException If cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
     */
    public void checkReadOnlyState(String opName, CacheConfiguration... cfgs) {
        IgniteOutClosure<String> cacheNameClo = null;
        IgniteOutClosure<String> cacheGrpNameClo = null;

        if (!F.isEmpty(cfgs)) {
            if (cfgs.length == 1) {
                cacheNameClo = () -> cfgs[0] == null ? null : cfgs[0].getName();
                cacheGrpNameClo = () -> cfgs[0] == null ? null : cfgs[0].getGroupName();
            }
            else {
                cacheNameClo = () -> Stream.of(cfgs)
                    .map(CacheConfiguration::getName)
                    .collect(toList()).toString();

                cacheGrpNameClo = () -> Stream.of(cfgs)
                    .map(CacheConfiguration::getGroupName)
                    .collect(toList()).toString();
            }
        }

        checkReadOnlyState(opName, cacheGrpNameClo, cacheNameClo);
    }

    /**
     * Checks that cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
     *
     * @param opName Operation name.
     * @param cacheGrpNameClo Closure for getting cache group name, if it needed (optional).
     * @param cacheNameClo Closure for getting cache name, if it needed (optional).
     * @throws CacheException If cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
     */
    private void checkReadOnlyState(
        String opName,
        @Nullable IgniteOutClosure<String> cacheGrpNameClo,
        @Nullable IgniteOutClosure<String> cacheNameClo
    ) {
        if (sharedCtx.readOnlyMode()) {
            String cacheName = cacheNameClo == null ? null : cacheNameClo.apply();
            String cacheGrpName = cacheGrpNameClo == null ? null : cacheGrpNameClo.apply();

            String errorMsg = format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT, opName, cacheGrpName, cacheName);

            throw new CacheException(new IgniteClusterReadOnlyException(errorMsg));
        }
    }

    /**
     * Send {@code GenerateEncryptionKeyRequest} and execute {@code after} closure if succeed.
     *
     * @param keyCnt Count of keys to generate.
     * @param after Closure to execute after encryption keys would be generated.
     */
    private IgniteInternalFuture<Boolean> generateEncryptionKeysAndStartCacheAfter(int keyCnt,
        GridPlainClosure2<Collection<byte[]>, byte[], IgniteInternalFuture<Boolean>> after) {
        IgniteInternalFuture<T2<Collection<byte[]>, byte[]>> genEncKeyFut = ctx.encryption().generateKeys(keyCnt);

        GridFutureAdapter<Boolean> res = new GridFutureAdapter<>();

        genEncKeyFut.listen(new IgniteInClosure<IgniteInternalFuture<T2<Collection<byte[]>, byte[]>>>() {
            @Override public void apply(IgniteInternalFuture<T2<Collection<byte[]>, byte[]>> fut) {
                try {
                    Collection<byte[]> grpKeys = fut.result().get1();
                    byte[] masterKeyDigest = fut.result().get2();

                    if (F.size(grpKeys, F.alwaysTrue()) != keyCnt)
                        res.onDone(false, fut.error());

                    IgniteInternalFuture<Boolean> dynStartCacheFut = after.apply(grpKeys, masterKeyDigest);

                    dynStartCacheFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                        @Override public void apply(IgniteInternalFuture<Boolean> fut) {
                            try {
                                res.onDone(fut.get(), fut.error());
                            }
                            catch (IgniteCheckedException e) {
                                res.onDone(false, e);
                            }
                        }
                    });
                }
                catch (Exception e) {
                    res.onDone(false, e);
                }
            }
        });

        return res;
    }

    /**
     * @param startReqs Start requests.
     * @param cachesToClose Cache tp close.
     * @return Future for cache change operation.
     */
    private IgniteInternalFuture<Boolean> startClientCacheChange(
        @Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) {
        assert startReqs != null ^ cachesToClose != null;

        DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID());

        IgniteInternalFuture old = pendingFuts.put(fut.id, fut);

        assert old == null : old;

        ctx.discovery().clientCacheStartEvent(fut.id, startReqs, cachesToClose);

        IgniteCheckedException err = checkNodeState();

        if (err != null)
            fut.onDone(err);

        return fut;
    }

    /**
     * Dynamically starts multiple caches.
     *
     * @param ccfgList Collection of cache configuration.
     * @param failIfExists Fail if exists flag.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
     * @return Future that will be completed when all caches are deployed.
     */
    public IgniteInternalFuture<Boolean> dynamicStartCaches(
        Collection<CacheConfiguration> ccfgList,
        boolean failIfExists,
        boolean checkThreadTx,
        boolean disabledAfterStart
    ) {
        return dynamicStartCachesByStoredConf(
            ccfgList.stream().map(StoredCacheData::new).collect(toList()),
            failIfExists,
            checkThreadTx,
            disabledAfterStart,
            null);
    }

    /**
     * Dynamically starts multiple caches.
     *
     * @param storedCacheDataList Collection of stored cache data.
     * @param failIfExists Fail if exists flag.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
     * @param restartId Restart requester id (it'll allow to start this cache only him).
     * @return Future that will be completed when all caches are deployed.
     */
    public IgniteInternalFuture<Boolean> dynamicStartCachesByStoredConf(
        Collection<StoredCacheData> storedCacheDataList,
        boolean failIfExists,
        boolean checkThreadTx,
        boolean disabledAfterStart,
        IgniteUuid restartId
    ) {
        if (checkThreadTx) {
            sharedCtx.tm().checkEmptyTransactions(() -> {
                List<String> cacheNames = storedCacheDataList.stream()
                    .map(StoredCacheData::config)
                    .map(CacheConfiguration::getName)
                    .collect(toList());

                return format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "dynamicStartCachesByStoredConf");
            });
        }

        GridPlainClosure2<Collection<byte[]>, byte[], IgniteInternalFuture<Boolean>> startCacheClsr = (grpKeys, masterKeyDigest) -> {
            List<DynamicCacheChangeRequest> srvReqs = null;
            Map<String, DynamicCacheChangeRequest> clientReqs = null;

            Iterator<byte[]> grpKeysIter = grpKeys.iterator();

            for (StoredCacheData ccfg : storedCacheDataList) {
                assert ccfg.groupKeyEncrypted() == null || ccfg.config().isEncryptionEnabled();

                // Reuse encription key if passed for this group. Take next generated otherwise.
                GroupKeyEncrypted encrKey = ccfg.config().isEncryptionEnabled() ? (ccfg.groupKeyEncrypted() != null ?
                    ccfg.groupKeyEncrypted() : new GroupKeyEncrypted(0, grpKeysIter.next())) : null;

                DynamicCacheChangeRequest req = prepareCacheChangeRequest(
                    ccfg.config(),
                    ccfg.config().getName(),
                    null,
                    resolveCacheType(ccfg.config()),
                    ccfg.sql(),
                    failIfExists,
                    true,
                    restartId,
                    disabledAfterStart,
                    ccfg.queryEntities(),
                    encrKey != null ? encrKey.key() : null,
                    encrKey != null ? encrKey.id() : null,
                    encrKey != null ? masterKeyDigest : null
                );

                if (req != null) {
                    if (req.clientStartOnly()) {
                        if (clientReqs == null)
                            clientReqs = U.newLinkedHashMap(storedCacheDataList.size());

                        clientReqs.put(req.cacheName(), req);
                    }
                    else {
                        if (srvReqs == null)
                            srvReqs = new ArrayList<>(storedCacheDataList.size());

                        srvReqs.add(req);
                    }
                }
            }

            if (srvReqs == null && clientReqs == null)
                return new GridFinishedFuture<>();

            if (clientReqs != null && srvReqs == null)
                return startClientCacheChange(clientReqs, null);

            GridCompoundFuture<?, Boolean> compoundFut = new GridCompoundFuture<>();

            for (DynamicCacheStartFuture fut : initiateCacheChanges(srvReqs))
                compoundFut.add((IgniteInternalFuture)fut);

            if (clientReqs != null) {
                IgniteInternalFuture<Boolean> clientStartFut = startClientCacheChange(clientReqs, null);

                compoundFut.add((IgniteInternalFuture)clientStartFut);
            }

            compoundFut.markInitialized();

            return compoundFut;
        };

        int encGrpCnt = 0;

        for (StoredCacheData ccfg : storedCacheDataList) {
            if (ccfg.config().isEncryptionEnabled())
                encGrpCnt++;
        }

        return generateEncryptionKeysAndStartCacheAfter(encGrpCnt, startCacheClsr);
    }

    /** Resolve cache type for input cacheType */
    private @NotNull CacheType resolveCacheType(CacheConfiguration ccfg) {
        if (CU.isUtilityCache(ccfg.getName()))
            return CacheType.UTILITY;
        else if (internalCaches.contains(ccfg.getName()))
            return CacheType.INTERNAL;
        else if (DataStructuresProcessor.isDataStructureCache(ccfg.getName()))
            return CacheType.DATA_STRUCTURES;
        else
            return CacheType.USER;
    }

    /**
     * @param cacheName Cache name to destroy.
     * @param sql If the cache needs to be destroyed only if it was created as the result of SQL {@code CREATE TABLE}
     * command.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @param restart Restart flag.
     * @param restartId Restart requester id (it'll allow to start this cache only him).
     * @return Future that will be completed when cache is destroyed.
     */
    public IgniteInternalFuture<Boolean> dynamicDestroyCache(
        String cacheName,
        boolean sql,
        boolean checkThreadTx,
        boolean restart,
        IgniteUuid restartId
    ) {
        assert cacheName != null;

        if (checkThreadTx) {
            sharedCtx.tm().checkEmptyTransactions(
                () -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheName, "dynamicDestroyCache"));
        }

        checkReadOnlyState("dynamic destroy cache", null, cacheName::toString);

        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, sql, true);

        req.stop(true);
        req.destroy(true);
        req.restart(restart);
        req.restartId(restartId);

        return F.first(initiateCacheChanges(F.asList(req)));
    }

    /**
     * @param cacheNames Collection of cache names to destroy.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @return Future that will be completed when cache is destroyed.
     */
    public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx) {
        return dynamicDestroyCaches(cacheNames, checkThreadTx, true);
    }

    /**
     * @param cacheNames Collection of cache names to destroy.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @param destroy Cache data destroy flag. Setting to <code>true</code> will cause removing all cache data
     * @return Future that will be completed when cache is destroyed.
     */
    public IgniteInternalFuture<?> dynamicDestroyCaches(
        Collection<String> cacheNames,
        boolean checkThreadTx,
        boolean destroy
    ) {
        if (checkThreadTx) {
            sharedCtx.tm().checkEmptyTransactions(
                () -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "dynamicDestroyCaches"));
        }

        if (!F.isEmpty(cacheNames))
            checkReadOnlyState("dynamic destroy caches", null, cacheNames::toString);

        return dynamicChangeCaches(
            cacheNames.stream().map(cacheName -> createStopRequest(cacheName, false, null, destroy))
                .collect(toList())
        );
    }

    /**
     * Prepares cache stop request.
     *
     * @param cacheName Cache names to destroy.
     * @param restart Restart flag.
     * @param restartId Restart requester id (it'll allow to start this cache only him).
     * @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store.
     * @return Future that will be completed when cache is destroyed.
     */
    public @NotNull DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false, true);

        req.stop(true);
        req.destroy(destroy);
        req.restart(restart);
        req.restartId(restartId);

        return req;
    }

    /**
     * Starts cache stop request as cache change batch.
     *
     * @param reqs cache stop requests.
     * @return compound future.
     */
    @NotNull public IgniteInternalFuture<?> dynamicChangeCaches(List<DynamicCacheChangeRequest> reqs) {
        return initiateCacheChanges(reqs).stream().collect(IgniteCollectors.toCompoundFuture());
    }

    /**
     * @param cacheName Cache name to close.
     * @return Future that will be completed when cache is closed.
     */
    IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
        assert cacheName != null;

        IgniteCacheProxy<?, ?> proxy = jcacheProxy(cacheName, false);

        if (proxy == null || proxy.isProxyClosed())
            return new GridFinishedFuture<>(); // No-op.

        sharedCtx.tm().checkEmptyTransactions(
            () -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheName, "dynamicCloseCache"));

        return startClientCacheChange(null, Collections.singleton(cacheName));
    }

    /**
     * Resets cache state after the cache has been moved to recovery state.
     *
     * @param cacheNames Cache names.
     * @return Future that will be completed when state is changed for all caches.
     */
    public IgniteInternalFuture<?> resetCacheState(Collection<String> cacheNames) throws ClusterTopologyCheckedException {
        sharedCtx.tm().checkEmptyTransactions(
            () -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "resetCacheState"));

        Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());

        for (String cacheName : cacheNames) {
            final IgniteInternalCache<Object, Object> cache0 = internalCache(cacheName);

            if (cache0 == null)
                continue;

            // Check if all lost partitions has at least one affinity owner.
            final Collection<Integer> lostParts = cache0.lostPartitions();

            if (lostParts.isEmpty())
                continue;

            for (Integer part : lostParts) {
                final Collection<ClusterNode> owners = cache0.affinity().mapPartitionToPrimaryAndBackups(part);

                if (owners.isEmpty())
                    throw new ClusterTopologyCheckedException("Cannot reset lost partitions because no baseline nodes " +
                        "are online [cache=" + cacheName + ", partition=" + part + ']');
            }

            DynamicCacheChangeRequest req = DynamicCacheChangeRequest.resetLostPartitions(ctx, cacheName);

            reqs.add(req);
        }

        return initiateCacheChanges(reqs).stream().collect(IgniteCollectors.toCompoundFuture());
    }

    /**
     * Finalizes partitions update counters.
     *
     * @return Future that will be completed when state is changed for all caches.
     */
    public IgniteInternalFuture<?> finalizePartitionsCounters() {
        sharedCtx.tm().checkEmptyTransactions(
            () -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, null, "finalizePartitionUpdateCounters"));

        Collection<DynamicCacheChangeRequest> reqs = Collections.singleton(DynamicCacheChangeRequest.finalizePartitionCounters(ctx));

        return initiateCacheChanges(reqs).stream().collect(IgniteCollectors.toCompoundFuture());
    }

    /**
     * @param cacheName Cache name.
     * @return Cache type.
     */
    public CacheType cacheType(String cacheName) {
        if (CU.isUtilityCache(cacheName))
            return CacheType.UTILITY;
        else if (internalCaches.contains(cacheName))
            return CacheType.INTERNAL;
        else if (DataStructuresProcessor.isDataStructureCache(cacheName))
            return CacheType.DATA_STRUCTURES;
        else
            return CacheType.USER;
    }

    /**
     * @return {@code True} if cache group {@code cacheGrpId} is encrypted. {@code False} otherwise.
     */
    public boolean isEncrypted(int cacheGrpId) {
        return cacheGrpId != MetaStorage.METASTORAGE_CACHE_ID && cacheGroup(cacheGrpId).config().isEncryptionEnabled();
    }

    /**
     * Save cache configuration to persistent store if necessary.
     *
     * @param desc Cache descriptor.
     */
    public void saveCacheConfiguration(DynamicCacheDescriptor desc) throws IgniteCheckedException {
        assert desc != null;

        locCfgMgr.saveCacheConfiguration(desc.toStoredData(splitter), true);
    }

    /**
     * Remove all persistent files for all registered caches.
     */
    public void cleanupCachesDirectories() throws IgniteCheckedException {
        if (sharedCtx.pageStore() == null || sharedCtx.kernalContext().clientNode())
            return;

        for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
            if (isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration()))
                sharedCtx.pageStore().cleanupPersistentSpace(desc.cacheConfiguration());
        }
    }

    /**
     * @param reqs Requests.
     * @return Collection of futures.
     */
    private Collection<DynamicCacheStartFuture> initiateCacheChanges(
        Collection<DynamicCacheChangeRequest> reqs
    ) {
        Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());

        Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());

        for (DynamicCacheChangeRequest req : reqs) {
            authorizeCacheChange(ctx.security(), req);

            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId());

            try {
                if (req.stop()) {
                    DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());

                    if (desc == null)
                        // No-op.
                        fut.onDone(false);
                }

                if (req.start() && req.startCacheConfiguration() != null) {
                    CacheConfiguration ccfg = req.startCacheConfiguration();

                    try {
                        cachesInfo.validateStartCacheConfiguration(ccfg);
                    }
                    catch (IgniteCheckedException e) {
                        fut.onDone(e);
                    }
                }

                if (fut.isDone())
                    continue;

                DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent(
                    req.requestId(), fut);

                assert old == null;

                if (fut.isDone())
                    continue;

                sndReqs.add(req);
            }
            catch (Exception e) {
                fut.onDone(e);
            }
            finally {
                res.add(fut);
            }
        }

        Exception err = null;

        if (!sndReqs.isEmpty()) {
            try {
                ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));

                err = checkNodeState();
            }
            catch (IgniteCheckedException e) {
                err = e;
            }
        }

        if (err != null) {
            for (DynamicCacheStartFuture fut : res)
                fut.onDone(err);
        }

        return res;
    }

    /**
     * Authorizes cache change request.
     *
     * @param security Security.
     * @param req Cache change request.
     */
    static void authorizeCacheChange(IgniteSecurity security, DynamicCacheChangeRequest req) {
        if (req.cacheType() == null || req.cacheType() == CacheType.USER) {
            if (req.start())
                authorizeCacheCreate(security, req.startCacheConfiguration());
            else if (req.stop())
                authorizeCacheDestroy(security, req.cacheName());
        }
    }

    /**
     * Authorizes cache destroy.
     *
     * @param security Security.
     * @param cacheName Cache name.
     */
    static void authorizeCacheDestroy(IgniteSecurity security, String cacheName) {
        security.authorize(cacheName, SecurityPermission.CACHE_DESTROY);
    }

    /**
     * Authorizes cache create.
     *
     * @param security Security.
     * @param cacheCfg Cache configuration.
     */
    static void authorizeCacheCreate(IgniteSecurity security, @Nullable CacheConfiguration cacheCfg) {
        if (cacheCfg == null)
            return;

        security.authorize(cacheCfg.getName(), SecurityPermission.CACHE_CREATE);

        if (cacheCfg.isOnheapCacheEnabled() &&
                IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_ONHEAP_CACHE))
            throw new SecurityException("Authorization failed for enabling on-heap cache.");
    }

    /**
     * @return Non null exception if node is stopping or disconnected.
     */
    private @Nullable IgniteCheckedException checkNodeState() {
        if (ctx.isStopping()) {
            return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
                "node is stopping.");
        }
        else if (ctx.clientDisconnected()) {
            return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
                "Failed to execute dynamic cache change request, client node disconnected.");
        }

        return null;
    }

    /**
     * @param type Event type.
     * @param customMsg Custom message instance.
     * @param node Event node.
     * @param topVer Topology version.
     * @param state Cluster state.
     */
    public void onDiscoveryEvent(
        int type,
        @Nullable DiscoveryCustomMessage customMsg,
        ClusterNode node,
        AffinityTopologyVersion topVer,
        DiscoveryDataClusterState state
    ) {
        cachesInfo.onDiscoveryEvent(type, node, topVer);

        sharedCtx.affinity().onDiscoveryEvent(type, customMsg, node, topVer, state);
    }

    /**
     * Callback invoked from discovery thread when discovery custom message is received.
     *
     * @param msg Customer message.
     * @param topVer Current topology version.
     * @param node Node sent message.
     * @return {@code True} if minor topology version should be increased.
     */
    public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer, ClusterNode node) {
        if (msg instanceof SchemaAbstractDiscoveryMessage) {
            ctx.query().onDiscovery((SchemaAbstractDiscoveryMessage)msg);

            return false;
        }

        if (msg instanceof CacheAffinityChangeMessage)
            return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg));

        if (msg instanceof SnapshotDiscoveryMessage &&
            ((SnapshotDiscoveryMessage)msg).needExchange())
            return true;

        if (msg instanceof WalStateAbstractMessage) {
            WalStateAbstractMessage msg0 = (WalStateAbstractMessage)msg;

            if (msg0 instanceof WalStateProposeMessage)
                sharedCtx.walState().onProposeDiscovery((WalStateProposeMessage)msg);
            else if (msg0 instanceof WalStateFinishMessage)
                sharedCtx.walState().onFinishDiscovery((WalStateFinishMessage)msg);

            return msg0.needExchange();
        }

        if (msg instanceof DynamicCacheChangeBatch) {
            boolean changeRequested = cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);

            ctx.query().onCacheChangeRequested((DynamicCacheChangeBatch)msg);

            return changeRequested;
        }

        if (msg instanceof DynamicCacheChangeFailureMessage)
            cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer);

        if (msg instanceof ClientCacheChangeDiscoveryMessage)
            cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);

        if (msg instanceof CacheStatisticsModeChangeMessage)
            onCacheStatisticsModeChange((CacheStatisticsModeChangeMessage)msg);

        if (msg instanceof CacheStatisticsClearMessage)
            onCacheStatisticsClear((CacheStatisticsClearMessage)msg);

        if (msg instanceof TxTimeoutOnPartitionMapExchangeChangeMessage)
            sharedCtx.tm().onTxTimeoutOnPartitionMapExchangeChange((TxTimeoutOnPartitionMapExchangeChangeMessage)msg);

        return false;
    }

    /** {@inheritDoc} */
    @Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) {
        IgniteNodeValidationResult res = validateHashIdResolvers(node, ctx, cacheDescriptors());

        if (res == null)
            res = validateRestartingCaches(node);

        if (res == null)
            res = validateRestoringCaches(node);

        return res;
    }

    /**
     * @param cacheName Cache to check.
     * @return Cache is under restarting.
     */
    public boolean isCacheRestarting(String cacheName) {
        return cachesInfo.isRestarting(cacheName);
    }

    /**
     * @param node Joining node to validate.
     * @return Node validation result if there was an issue with the joining node, {@code null} otherwise.
     */
    private IgniteNodeValidationResult validateRestartingCaches(ClusterNode node) {
        if (cachesInfo.hasRestartingCaches()) {
            String msg = "Joining node during caches restart is not allowed [joiningNodeId=" + node.id() +
                ", restartingCaches=" + new HashSet<>(cachesInfo.restartingCaches()) + ']';

            return new IgniteNodeValidationResult(node.id(), msg);
        }

        return null;
    }

    /**
     * @param node Joining node to validate.
     * @return Node validation result if there was an issue with the joining node, {@code null} otherwise.
     */
    private IgniteNodeValidationResult validateRestoringCaches(ClusterNode node) {
        if (ctx.cache().context().snapshotMgr().isRestoring()) {
            String msg = "Joining node during caches restore is not allowed [joiningNodeId=" + node.id() + ']';

            return new IgniteNodeValidationResult(node.id(), msg);
        }

        return null;
    }

    /**
     * @return Keep static cache configuration flag. If {@code true}, static cache configuration will override
     * configuration persisted on disk.
     */
    public boolean keepStaticCacheConfiguration() {
        return keepStaticCacheConfiguration;
    }

    /**
     * @param name Cache name.
     * @param <K> type of keys.
     * @param <V> type of values.
     * @return Cache instance for given name.
     */
    public <K, V> @Nullable IgniteInternalCache<K, V> cache(String name) {
        assert name != null;

        if (log.isDebugEnabled())
            log.debug("Getting cache for name: " + name);

        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);

        return jcache == null ? null : jcache.internalProxy();
    }

    /**
     * Await proxy initialization.
     *
     * @param jcache Cache proxy.
     */
    private void awaitInitializeProxy(IgniteCacheProxyImpl<?, ?> jcache) {
        if (jcache != null) {
            CountDownLatch initLatch = jcache.getInitLatch();

            try {
                while (initLatch.getCount() > 0) {
                    initLatch.await(2000, TimeUnit.MILLISECONDS);

                    if (log.isInfoEnabled())
                        log.info("Failed to wait proxy initialization, cache=" + jcache.getName() +
                            ", localNodeId=" + ctx.localNodeId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // Ignore intteruption.
            }
        }
    }

    /**
     * @param name Cache name.
     */
    public void completeProxyInitialize(String name) {
        IgniteCacheProxyImpl<?, ?> jcache = jCacheProxies.get(name);

        if (jcache != null) {
            CountDownLatch proxyInitLatch = jcache.getInitLatch();

            if (proxyInitLatch.getCount() > 0) {
                if (log.isInfoEnabled())
                    log.info("Finish proxy initialization, cacheName=" + name +
                        ", localNodeId=" + ctx.localNodeId());

                proxyInitLatch.countDown();
            }
        }
        else {
            if (log.isInfoEnabled())
                log.info("Can not finish proxy initialization because proxy does not exist, cacheName=" + name +
                    ", localNodeId=" + ctx.localNodeId());
        }
    }

    /**
     * @param name Cache name.
     * @return Cache instance for given name.
     * @throws IgniteCheckedException If failed.
     */
    public <K, V> IgniteInternalCache<K, V> getOrStartCache(String name) throws IgniteCheckedException {
        return getOrStartCache(name, null);
    }

    /**
     * @param name Cache name.
     * @return Cache instance for given name.
     * @throws IgniteCheckedException If failed.
     */
    public <K, V> IgniteInternalCache<K, V> getOrStartCache(
        String name,
        CacheConfiguration ccfg
    ) throws IgniteCheckedException {
        assert name != null;

        if (log.isDebugEnabled())
            log.debug("Getting cache for name: " + name);

        IgniteCacheProxy<?, ?> cache = jcacheProxy(name, true);

        if (cache == null) {
            dynamicStartCache(ccfg, name, null, false, ccfg == null, true).get();

            cache = jcacheProxy(name, true);
        }

        return cache == null ? null : (IgniteInternalCache<K, V>)cache.internalProxy();
    }

    /**
     * @return All configured cache instances.
     */
    public Collection<IgniteInternalCache<?, ?>> caches() {
        return F.viewReadOnly(jCacheProxies.values(),
            (IgniteClosure<IgniteCacheProxy<?, ?>, IgniteInternalCache<?, ?>>)IgniteCacheProxy::internalProxy);
    }

    /**
     * @return All configured cache instances.
     */
    public Collection<IgniteCacheProxy<?, ?>> jcaches() {
        return F.viewReadOnly(jCacheProxies.values(),
            (IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>)IgniteCacheProxyImpl::gatewayWrapper);
    }

    /**
     * Gets utility cache.
     *
     * @return Utility cache.
     */
    public <K, V> IgniteInternalCache<K, V> utilityCache() {
        return internalCacheEx(CU.UTILITY_CACHE_NAME);
    }

    /**
     * @param name Cache name.
     * @return Cache.
     */
    private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) {
        if (ctx.discovery().localNode().isClient()) {
            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);

            if (proxy == null) {
                GridCacheAdapter<?, ?> cacheAdapter = caches.get(name);

                if (cacheAdapter != null) {
                    proxy = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false);

                    IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, (IgniteCacheProxyImpl<?, ?>)proxy);

                    if (prev != null)
                        proxy = (IgniteCacheProxy<K, V>)prev;

                    completeProxyInitialize(proxy.getName());
                }
            }

            assert proxy != null : name;

            return proxy.internalProxy();
        }

        return internalCache(name);
    }

    /**
     * @param name Cache name.
     * @param <K> type of keys.
     * @param <V> type of values.
     * @return Cache instance for given name.
     * @throws IllegalArgumentException If cache not exists.
     */
    public <K, V> IgniteInternalCache<K, V> publicCache(String name) {
        assert name != null;

        if (log.isDebugEnabled())
            log.debug("Getting public cache for name: " + name);

        DynamicCacheDescriptor desc = cacheDescriptor(name);

        if (desc == null)
            throw new IllegalArgumentException("Cache is not started: " + name);

        if (!desc.cacheType().userCache())
            throw new IllegalStateException("Failed to get cache because it is a system cache: " + name);

        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);

        if (jcache == null)
            throw new IllegalArgumentException("Cache is not started: " + name);

        return jcache.internalProxy();
    }

    /**
     * @param cacheName Cache name.
     * @param <K> type of keys.
     * @param <V> type of values.
     * @return Cache instance for given name.
     * @throws IgniteCheckedException If failed.
     */
    public <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName) throws IgniteCheckedException {
        return publicJCache(cacheName, true, true);
    }

    /**
     * @param cacheName Cache name.
     * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
     * otherwise returns {@code null} in this case.
     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
     * @return Cache instance for given name.
     * @throws IgniteCheckedException If failed.
     */
    @SuppressWarnings({"ConstantConditions"})
    public @Nullable <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName,
        boolean failIfNotStarted,
        boolean checkThreadTx) throws IgniteCheckedException {
        assert cacheName != null;

        if (log.isDebugEnabled())
            log.debug("Getting public cache for name: " + cacheName);

        DynamicCacheDescriptor desc = cacheDescriptor(cacheName);

        if (desc != null && !desc.cacheType().userCache())
            throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);

        IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, true);

        // Try to start cache, there is no guarantee that cache will be instantiated.
        if (proxy == null) {
            dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();

            proxy = jcacheProxy(cacheName, true);
        }

        return proxy != null ? (IgniteCacheProxy<K, V>)proxy.gatewayWrapper() : null;
    }

    /**
     * Get configuration for the given cache.
     *
     * @param name Cache name.
     * @return Cache configuration.
     */
    public CacheConfiguration cacheConfiguration(String name) {
        assert name != null;

        DynamicCacheDescriptor desc = cacheDescriptor(name);

        if (desc == null) {
            if (cachesInfo.isRestarting(name)) {
                IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(name);

                assert proxy != null : name;

                proxy.internalProxy(); //should throw exception

                // we have procceed, try again
                return cacheConfiguration(name);
            }

            throw new IllegalStateException("Cache doesn't exist: " + name);
        }
        else
            return desc.cacheConfiguration();
    }

    /**
     * Get registered cache descriptor.
     *
     * @param name Name.
     * @return Descriptor.
     */
    public DynamicCacheDescriptor cacheDescriptor(String name) {
        return cachesInfo.registeredCaches().get(name);
    }

    /**
     * @return Cache descriptors.
     */
    public Map<String, DynamicCacheDescriptor> cacheDescriptors() {
        return cachesInfo.registeredCaches();
    }

    /**
     * @return Collection of persistent cache descriptors.
     */
    public Collection<DynamicCacheDescriptor> persistentCaches() {
        return cachesInfo.registeredCaches().values()
            .stream()
            .filter(desc -> isPersistentCache(desc.cacheConfiguration(), ctx.config().getDataStorageConfiguration()))
            .collect(toList());
    }

    /**
     * @return Collection of persistent cache group descriptors.
     */
    public Collection<CacheGroupDescriptor> persistentGroups() {
        return cachesInfo.registeredCacheGroups().values()
            .stream()
            .filter(CacheGroupDescriptor::persistenceEnabled)
            .collect(toList());
    }

    /**
     * @return Cache group descriptors.
     */
    public Map<Integer, CacheGroupDescriptor> cacheGroupDescriptors() {
        return cachesInfo.registeredCacheGroups();
    }

    /**
     * Tries to find cache group descriptor either in registered cache groups
     * or in marked for deletion collection if cache group is considered to be stopped.
     *
     * @param grpId Group id.
     */
    public CacheGroupDescriptor cacheGroupDescriptor(int grpId) {
        CacheGroupDescriptor desc = cacheGroupDescriptors().get(grpId);

        // Try to find descriptor if it was marked for deletion.
        if (desc == null)
            return cachesInfo.markedForDeletionCacheGroupDesc(grpId);

        return desc;
    }

    /**
     * @param cacheId Cache ID.
     * @return Cache descriptor.
     */
    public @Nullable DynamicCacheDescriptor cacheDescriptor(int cacheId) {
        return cachesInfo.registeredCachesById().get(cacheId);
    }

    /**
     * @param cacheCfg Cache configuration template.
     * @throws IgniteCheckedException If failed.
     */
    public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException {
        assert cacheCfg.getName() != null;

        String name = cacheCfg.getName();

        DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(name);

        if (desc != null)
            return;

        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg,
            backwardCompatibleSplitter().split(cacheCfg));

        TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId());

        TemplateConfigurationFuture old =
            (TemplateConfigurationFuture)pendingTemplateFuts.putIfAbsent(cacheCfg.getName(), fut);

        if (old != null)
            fut = old;

        Exception err = null;

        try {
            ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));

            if (ctx.isStopping()) {
                err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
                    "node is stopping.");
            }
            else if (ctx.clientDisconnected()) {
                err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
                    "Failed to execute dynamic cache change request, client node disconnected.");
            }
        }
        catch (IgniteCheckedException e) {
            err = e;
        }

        if (err != null)
            fut.onDone(err);

        fut.get();
    }

    /**
     * @param name Cache name.
     * @return Cache instance for given name.
     */
    @SuppressWarnings("unchecked")
    public <K, V> IgniteCacheProxy<K, V> jcache(String name) {
        assert name != null;

        IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);

        if (cache == null) {
            GridCacheAdapter<?, ?> cacheAdapter = caches.get(name);

            if (cacheAdapter != null) {
                cache = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false);

                IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, (IgniteCacheProxyImpl<?, ?>)cache);

                if (prev != null)
                    cache = (IgniteCacheProxy<K, V>)prev;

                completeProxyInitialize(cache.getName());
            }
        }

        if (cache == null)
            throw new IllegalArgumentException("Cache is not configured: " + name);

        return cache;
    }

    /**
     * @param name Cache name.
     * @param awaitInit Await proxy initialization.
     * @return Cache proxy.
     */
    public @Nullable IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) {
        IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(name);

        if (awaitInit)
            awaitInitializeProxy(cache);

        return cache;
    }

    /**
     * @param name Cache name.
     * @param proxy Cache proxy.
     * @return Previous cache proxy.
     */
    public @Nullable IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) {
        return jCacheProxies.putIfAbsent(name, proxy);
    }

    /**
     * @return All configured public cache instances.
     */
    public Collection<IgniteCacheProxy<?, ?>> publicCaches() {
        Collection<IgniteCacheProxy<?, ?>> res = new ArrayList<>(jCacheProxies.size());

        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
            if (proxy.context().userCache())
                res.add(proxy.gatewayWrapper());
        }

        return res;
    }

    /**
     * @param name Cache name.
     * @param <K> type of keys.
     * @param <V> type of values.
     * @return Cache instance for given name.
     */
    public <K, V> GridCacheAdapter<K, V> internalCache(String name) {
        assert name != null;

        if (log.isDebugEnabled())
            log.debug("Getting internal cache adapter: " + name);

        return (GridCacheAdapter<K, V>)caches.get(name);
    }

    /**
     * Cancel all user operations.
     */
    private void cancelFutures() {
        sharedCtx.mvcc().onStop();

        Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");

        for (IgniteInternalFuture fut : pendingFuts.values())
            ((GridFutureAdapter)fut).onDone(err);

        for (IgniteInternalFuture fut : pendingTemplateFuts.values())
            ((GridFutureAdapter)fut).onDone(err);

        for (EnableStatisticsFuture fut : manageStatisticsFuts.values())
            fut.onDone(err);
    }

    /**
     * @return All internal cache instances.
     */
    public Collection<GridCacheAdapter<?, ?>> internalCaches() {
        return caches.values();
    }

    /**
     * @param name Cache name.
     * @return {@code True} if specified cache is system, {@code false} otherwise.
     */
    public boolean systemCache(String name) {
        assert name != null;

        DynamicCacheDescriptor desc = cacheDescriptor(name);

        return desc != null && !desc.cacheType().userCache();
    }

    /** {@inheritDoc} */
    @Override public void printMemoryStats() {
        X.println(">>> ");

        for (GridCacheAdapter c : caches.values()) {
            X.println(">>> Cache memory stats [igniteInstanceName=" + ctx.igniteInstanceName() +
                ", cache=" + c.name() + ']');

            c.context().printMemoryStats();
        }
    }

    /**
     * Callback invoked by deployment manager for whenever a class loader gets undeployed.
     *
     * @param ldr Class loader.
     */
    public void onUndeployed(ClassLoader ldr) {
        if (!ctx.isStopping()) {
            for (GridCacheAdapter<?, ?> cache : caches.values()) {
                // Do not notify system caches and caches for which deployment is disabled.
                if (cache.context().userCache() && cache.context().deploymentEnabled())
                    cache.onUndeploy(ldr);
            }
        }
    }

    /**
     * @return Shared context.
     */
    public <K, V> GridCacheSharedContext<K, V> context() {
        return (GridCacheSharedContext<K, V>)sharedCtx;
    }

    /** @return Local config manager. */
    public GridLocalConfigManager configManager() {
        return locCfgMgr;
    }

    /**
     * @return Transactions interface implementation.
     */
    public IgniteTransactionsEx transactions() {
        return transactions;
    }

    /**
     * Registers MBean for cache components.
     *
     * @param obj Cache component.
     * @param cacheName Cache name.
     * @param near Near flag.
     * @throws IgniteCheckedException If registration failed.
     */
    private void registerMbean(Object obj, @Nullable String cacheName, boolean near)
        throws IgniteCheckedException {
        if (U.IGNITE_MBEANS_DISABLED)
            return;

        assert obj != null;

        MBeanServer srvr = ctx.config().getMBeanServer();

        assert srvr != null;

        cacheName = U.maskName(cacheName);

        cacheName = near ? cacheName + "-near" : cacheName;

        final Object mbeanImpl = (obj instanceof IgniteMBeanAware) ? ((IgniteMBeanAware)obj).getMBean() : obj;

        for (Class<?> itf : mbeanImpl.getClass().getInterfaces()) {
            if (itf.getName().endsWith("MBean") || itf.getName().endsWith("MXBean")) {
                try {
                    U.registerMBean(srvr, ctx.igniteInstanceName(), cacheName, obj.getClass().getName(), mbeanImpl,
                        (Class<Object>)itf);
                }
                catch (Throwable e) {
                    throw new IgniteCheckedException("Failed to register MBean for component: " + obj, e);
                }

                break;
            }
        }
    }

    /**
     * Unregisters MBean for cache components.
     *
     * @param o Cache component.
     * @param cacheName Cache name.
     * @param near Near flag.
     */
    private void unregisterMbean(Object o, @Nullable String cacheName, boolean near) {
        if (U.IGNITE_MBEANS_DISABLED)
            return;

        assert o != null;

        MBeanServer srvr = ctx.config().getMBeanServer();

        assert srvr != null;

        cacheName = U.maskName(cacheName);

        cacheName = near ? cacheName + "-near" : cacheName;

        boolean needToUnregister = o instanceof IgniteMBeanAware;

        if (!needToUnregister) {
            for (Class<?> itf : o.getClass().getInterfaces()) {
                if (itf.getName().endsWith("MBean") || itf.getName().endsWith("MXBean")) {
                    needToUnregister = true;

                    break;
                }
            }
        }

        if (needToUnregister) {
            try {
                srvr.unregisterMBean(U.makeMBeanName(ctx.igniteInstanceName(), cacheName, o.getClass().getName()));
            }
            catch (Throwable e) {
                U.error(log, "Failed to unregister MBean for component: " + o, e);
            }
        }
    }

    /**
     * @param grp Cache group.
     * @param ccfg Cache configuration.
     * @param objs Extra components.
     * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface.
     */
    private Iterable<Object> lifecycleAwares(CacheGroupContext grp, CacheConfiguration ccfg, Object... objs) {
        Collection<Object> ret = new ArrayList<>(7 + objs.length);

        if (grp.affinityFunction() != ccfg.getAffinity())
            ret.add(ccfg.getAffinity());

        ret.add(ccfg.getAffinityMapper());
        ret.add(ccfg.getEvictionFilter());
        ret.add(ccfg.getEvictionPolicyFactory());
        ret.add(ccfg.getEvictionPolicy());
        ret.add(ccfg.getInterceptor());

        NearCacheConfiguration nearCfg = ccfg.getNearConfiguration();

        if (nearCfg != null) {
            ret.add(nearCfg.getNearEvictionPolicyFactory());
            ret.add(nearCfg.getNearEvictionPolicy());
        }

        Collections.addAll(ret, objs);

        return ret;
    }

    /**
     * @param val Object to check.
     * @return Configuration copy.
     * @throws IgniteCheckedException If validation failed.
     */
    CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
        if (val == null)
            return null;

        return withBinaryContext(new IgniteOutClosureX<CacheConfiguration>() {
            @Override public CacheConfiguration applyx() throws IgniteCheckedException {
                if (val.getCacheStoreFactory() != null) {
                    try {
                        ClassLoader ldr = ctx.config().getClassLoader();

                        if (ldr == null)
                            ldr = val.getCacheStoreFactory().getClass().getClassLoader();

                        U.unmarshal(marsh, U.marshal(marsh, val.getCacheStoreFactory()),
                            U.resolveClassLoader(ldr, ctx.config()));
                    }
                    catch (IgniteCheckedException e) {
                        throw new IgniteCheckedException("Failed to validate cache configuration. " +
                            "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e);
                    }
                }

                try {
                    return U.unmarshal(marsh, U.marshal(marsh, val), U.resolveClassLoader(ctx.config()));
                }
                catch (IgniteCheckedException e) {
                    throw new IgniteCheckedException("Failed to validate cache configuration " +
                        "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e);
                }
            }
        });
    }

    /**
     * @param c Closure.
     * @return Closure result.
     * @throws IgniteCheckedException If failed.
     */
    private <T> T withBinaryContext(IgniteOutClosureX<T> c) throws IgniteCheckedException {
        IgniteCacheObjectProcessor objProc = ctx.cacheObjects();
        BinaryContext oldCtx = null;

        if (objProc instanceof CacheObjectBinaryProcessorImpl) {
            GridBinaryMarshaller binMarsh = ((CacheObjectBinaryProcessorImpl)objProc).marshaller();

            oldCtx = binMarsh == null ? null : binMarsh.pushContext();
        }

        try {
            return c.applyx();
        }
        finally {
            if (objProc instanceof CacheObjectBinaryProcessorImpl)
                GridBinaryMarshaller.popContext(oldCtx);
        }
    }

    /**
     * Prepares DynamicCacheChangeRequest for cache creation.
     *
     * @param ccfg Cache configuration
     * @param cacheName Cache name
     * @param nearCfg Near cache configuration
     * @param cacheType Cache type
     * @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
     * @param failIfExists Fail if exists flag.
     * @param failIfNotStarted If {@code true} fails if cache is not started.
     * @param restartId Restart requester id (it'll allow to start this cache only him).
     * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
     * @param qryEntities Query entities.
     * @param encKey Encryption key.
     * @param encKeyId Id of the encryption key.
     * @param masterKeyDigest Master key digest.
     * @return Request or {@code null} if cache already exists.
     * @throws IgniteCheckedException if some of pre-checks failed
     * @throws CacheExistsException if cache exists and failIfExists flag is {@code true}
     */
    private DynamicCacheChangeRequest prepareCacheChangeRequest(
        @Nullable CacheConfiguration ccfg,
        String cacheName,
        @Nullable NearCacheConfiguration nearCfg,
        CacheType cacheType,
        boolean sql,
        boolean failIfExists,
        boolean failIfNotStarted,
        IgniteUuid restartId,
        boolean disabledAfterStart,
        @Nullable Collection<QueryEntity> qryEntities,
        @Nullable byte[] encKey,
        @Nullable Integer encKeyId,
        @Nullable byte[] masterKeyDigest
    ) throws IgniteCheckedException {
        DynamicCacheDescriptor desc = cacheDescriptor(cacheName);

        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());

        req.sql(sql);

        req.failIfExists(failIfExists);

        req.disabledAfterStart(disabledAfterStart);

        req.masterKeyDigest(masterKeyDigest);

        req.encryptionKey(encKey);

        req.encryptionKeyId(encKeyId);

        req.restartId(restartId);

        if (ccfg != null) {
            cloneCheckSerializable(ccfg);

            if (desc != null) {
                if (failIfExists) {
                    throw new CacheExistsException("Failed to start cache " +
                        "(a cache with the same name is already started): " + cacheName);
                }
                else {
                    CacheConfiguration descCfg = desc.cacheConfiguration();

                    // Check if we were asked to start a near cache.
                    if (nearCfg != null) {
                        if (isLocalAffinity(descCfg)) {
                            // If we are on a data node and near cache was enabled, return success, else - fail.
                            if (descCfg.getNearConfiguration() != null)
                                return null;
                            else
                                throw new IgniteCheckedException("Failed to start near " +
                                    "cache (local node is an affinity node for cache): " + cacheName);
                        }
                        else
                            // If local node has near cache, return success.
                            req.clientStartOnly(true);
                    }
                    else if (!isLocalAffinity(descCfg))
                        req.clientStartOnly(true);

                    req.deploymentId(desc.deploymentId());

                    T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(desc);

                    req.startCacheConfiguration(splitCfg.get1());
                    req.cacheConfigurationEnrichment(splitCfg.get2());

                    req.schema(desc.schema());
                }
            }
            else {
                CacheConfiguration cfg = new CacheConfiguration(ccfg);

                CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);

                // Cache configuration must be initialized before splitting.
                initialize(cfg, cacheObjCtx);

                req.deploymentId(IgniteUuid.randomUuid());

                T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(cfg);

                req.startCacheConfiguration(splitCfg.get1());
                req.cacheConfigurationEnrichment(splitCfg.get2());

                cfg = splitCfg.get1();

                if (restartId != null)
                    req.schema(new QuerySchema(qryEntities == null ? cfg.getQueryEntities() : qryEntities));
                else
                    req.schema(new QuerySchema(qryEntities != null ? QueryUtils.normalizeQueryEntities(ctx, qryEntities, cfg)
                            : cfg.getQueryEntities()));
            }
        }
        else {
            req.clientStartOnly(true);

            if (desc != null)
                ccfg = desc.cacheConfiguration();

            if (ccfg == null) {
                if (failIfNotStarted) {
                    throw new CacheExistsException("Failed to start client cache " +
                        "(a cache with the given name is not started): " + cacheName);
                }
                else
                    return null;
            }

            req.deploymentId(desc.deploymentId());

            T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(ccfg);

            req.startCacheConfiguration(splitCfg.get1());
            req.cacheConfigurationEnrichment(splitCfg.get2());

            req.schema(desc.schema());
        }

        if (nearCfg != null)
            req.nearCacheConfiguration(nearCfg);

        req.cacheType(cacheType);

        return req;
    }

    /**
     * Enable/disable statistics globally for the caches
     *
     * @param cacheNames Collection of cache names.
     * @param enabled Statistics enabled flag.
     */
    public void enableStatistics(Collection<String> cacheNames, boolean enabled) throws IgniteCheckedException {
        Collection<IgniteInternalCache> caches = manageStatisticsCaches(cacheNames);

        Collection<String> globalCaches = new HashSet<>(U.capacity(caches.size()));

        for (IgniteInternalCache cache : caches) {
            cache.context().statisticsEnabled(enabled);

            globalCaches.add(cache.name());
        }

        if (globalCaches.isEmpty())
            return;

        CacheStatisticsModeChangeMessage msg = new CacheStatisticsModeChangeMessage(UUID.randomUUID(), globalCaches, enabled);

        EnableStatisticsFuture fut = new EnableStatisticsFuture(msg.requestId());

        manageStatisticsFuts.put(msg.requestId(), fut);

        ctx.grid().context().discovery().sendCustomEvent(msg);

        fut.get();
    }

    /**
     * Clear statistics globally for the caches
     *
     * @param cacheNames Collection of cache names.
     */
    public void clearStatistics(Collection<String> cacheNames) throws IgniteCheckedException {
        Collection<IgniteInternalCache> caches = manageStatisticsCaches(cacheNames);

        Collection<String> globalCaches = new HashSet<>(U.capacity(caches.size()));

        for (IgniteInternalCache cache : caches)
            globalCaches.add(cache.name());

        if (globalCaches.isEmpty())
            return;

        CacheStatisticsClearMessage msg = new CacheStatisticsClearMessage(UUID.randomUUID(), globalCaches);

        EnableStatisticsFuture fut = new EnableStatisticsFuture(msg.requestId());

        manageStatisticsFuts.put(msg.requestId(), fut);

        ctx.grid().context().discovery().sendCustomEvent(msg);

        fut.get();
    }

    /**
     *
     */
    private Collection<IgniteInternalCache> manageStatisticsCaches(Collection<String> caches)
        throws IgniteCheckedException {
        assert caches != null;

        Collection<IgniteInternalCache> res = new ArrayList<>(caches.size());

        if (!cacheNames().containsAll(caches))
            throw new IgniteCheckedException("One or more cache descriptors not found [caches=" + caches + ']');

        for (String cacheName : caches) {
            IgniteInternalCache cache = cache(cacheName);

            if (cache == null)
                throw new IgniteCheckedException("Cache not found [cacheName=" + cacheName + ']');

            res.add(cache);
        }

        return res;
    }

    /**
     * @param obj Object to clone.
     * @return Object copy.
     * @throws IgniteCheckedException If failed.
     */
    public <T> T clone(final T obj) throws IgniteCheckedException {
        return withBinaryContext(new IgniteOutClosureX<T>() {
            @Override public T applyx() throws IgniteCheckedException {
                return U.unmarshal(marsh, U.marshal(marsh, obj), U.resolveClassLoader(ctx.config()));
            }
        });
    }

    /**
     * @param oldFormat Old format.
     */
    private CacheConfigurationSplitter splitter(boolean oldFormat) {
        // Requesting splitter with old format support is rare operation.
        // It's acceptable to allocate it every time by request.
        return oldFormat ? new CacheConfigurationSplitterOldFormat(enricher) : splitter;
    }

    /**
     * @return By default it returns splitter without old format configuration support.
     */
    public CacheConfigurationSplitter splitter() {
        return splitter(false);
    }

    /**
     * If not all nodes in cluster support splitted cache configurations it returns old format splitter.
     * In other case it returns default splitter.
     *
     * @return Cache configuration splitter with or without old format support depending on cluster state.
     */
    private CacheConfigurationSplitter backwardCompatibleSplitter() {
        IgniteDiscoverySpi spi = (IgniteDiscoverySpi)ctx.discovery().getInjectedDiscoverySpi();

        boolean oldFormat = !spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);

        return splitter(oldFormat);
    }

    /**
     * @return Cache configuration enricher.
     */
    public CacheConfigurationEnricher enricher() {
        return enricher;
    }

    /**
     * Pages list view supplier.
     *
     * @param filter Filter.
     */
    private Iterable<CachePagesListView> pagesListViewSupplier(Map<String, Object> filter) {
        Integer cacheGrpId = (Integer)filter.get(CachePagesListViewWalker.CACHE_GROUP_ID_FILTER);
        Integer partId = (Integer)filter.get(CachePagesListViewWalker.PARTITION_ID_FILTER);
        Integer bucketNum = (Integer)filter.get(CachePagesListViewWalker.BUCKET_NUMBER_FILTER);

        Iterable<IgniteCacheOffheapManager.CacheDataStore> dataStores = F.flat(F.iterator(
            filteredMap(cacheGrps, cacheGrpId).values(), grp -> grp.offheap().cacheDataStores(), true));

        return F.flat(F.iterator(dataStores, dataStore -> {
            RowStore rowStore = dataStore.rowStore();

            if (rowStore == null || !(dataStore instanceof GridCacheOffheapManager.GridCacheDataStore))
                return Collections.emptySet();

            PagesList pagesList = (PagesList)rowStore.freeList();

            if (bucketNum != null) {
                return bucketNum >= 0 && bucketNum < pagesList.bucketsCount() ?
                    Collections.singleton(new CachePagesListView(pagesList, bucketNum, dataStore.partId())) :
                    Collections.emptyList();
            }

            return IntStream.range(0, pagesList.bucketsCount())
                .mapToObj(bucket -> new CachePagesListView(pagesList, bucket, dataStore.partId()))
                .collect(toList());
        }, true, cacheDataStore -> partId == null || cacheDataStore.partId() == partId));
    }

    /**
     * Partition states view supplier.
     *
     * @param filter Filter.
     */
    private Iterable<PartitionStateView> partStatesViewSupplier(Map<String, Object> filter) {
        Integer cacheGrpId = (Integer)filter.get(PartitionStateViewWalker.CACHE_GROUP_ID_FILTER);
        UUID nodeId = (UUID)filter.get(PartitionStateViewWalker.NODE_ID_FILTER);
        Integer partId = (Integer)filter.get(PartitionStateViewWalker.PARTITION_ID_FILTER);

        return () -> F.concat(F.concat(F.iterator(filteredMap(cacheGrps, cacheGrpId).values(),
            grp -> F.iterator(filteredMap(grp.topology().partitionMap(false), nodeId).entrySet(),
                nodeToParts -> F.iterator(filteredMap(nodeToParts.getValue().map(),
                    partId == null || partId < 0 ? null : partId).entrySet(),
                    partToStates -> new PartitionStateView(
                        grp.groupId(),
                        nodeToParts.getKey(),
                        partToStates.getKey(),
                        partToStates.getValue(),
                        isPrimary(grp, nodeToParts.getKey(), partToStates.getKey())),
                    true),
                true),
            true)));
    }

    /**
     * Filter map by key.
     *
     * @param map Map.
     * @param key Filtering key.
     */
    private static <K, V> Map<K, V> filteredMap(Map<K, V> map, K key) {
        if (key == null)
            return map;

        V val = map.get(key);

        return val != null ? F.asMap(key, val) : Collections.emptyMap();
    }

    /**
     * @param grp Cache group.
     * @param nodeId Node id.
     * @param part Partition.
     */
    private static boolean isPrimary(CacheGroupContext grp, UUID nodeId, int part) {
        List<ClusterNode> nodes = grp.affinity().lastReadyAffinity().get(part);

        if (F.isEmpty(nodes))
            return false;

        ClusterNode primaryNode = nodes.get(0);

        return primaryNode != null && nodeId.equals(primaryNode.id());
    }

    /**
     * Recovery lifecycle for caches.
     */
    private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener {
        /**
         * Set of QuerySchema's saved on recovery. It's needed if cache query schema has changed after node joined to
         * topology.
         */
        private final Map<Integer, QuerySchema> querySchemas = new ConcurrentHashMap<>();

        /** Flag for stopping warm-up. */
        private final AtomicBoolean stopWarmUp = new AtomicBoolean();

        /** Currently running warm-up strategy. */
        private volatile WarmUpStrategy curWarmUpStrat;

        /** {@inheritDoc} */
        @Override public void onBaselineChange() {
            for (GridCacheAdapter<?, ?> cache : caches.values())
                if (cache != null)
                    cache.context().ttl().unregister();

            onKernalStopCaches(true);

            stopCaches(true);

            sharedCtx.database().cleanupRestoredCaches();
        }

        /** {@inheritDoc} */
        @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
            CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();

            cachesInfo.onStart(data);
        }

        /** {@inheritDoc} */
        @Override public void beforeBinaryMemoryRestore(
            IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
            for (DynamicCacheDescriptor cacheDescriptor : persistentCaches())
                preparePageStore(cacheDescriptor, true);
        }

        /** {@inheritDoc} */
        @Override public void afterBinaryMemoryRestore(
            IgniteCacheDatabaseSharedManager mgr,
            GridCacheDatabaseSharedManager.RestoreBinaryState restoreState) throws IgniteCheckedException {

            Object consistentId = ctx.pdsFolderResolver().resolveFolders().consistentId();
            DetachedClusterNode clusterNode = new DetachedClusterNode(consistentId, ctx.nodeAttributes());

            for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
                boolean affNode = CU.affinityNode(clusterNode, cacheDescriptor.cacheConfiguration().getNodeFilter());

                if (!affNode)
                    continue;

                startCacheInRecoveryMode(cacheDescriptor);

                querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
            }
        }

        /** {@inheritDoc} */
        @Override public void afterLogicalUpdatesApplied(
            IgniteCacheDatabaseSharedManager mgr,
            GridCacheDatabaseSharedManager.RestoreLogicalState restoreState
        ) throws IgniteCheckedException {
            Collection<CacheGroupContext> cacheGrps = cacheGroups();

            restorePartitionStates(cacheGrps, restoreState.partitionRecoveryStates());

            // Start warm-up only after restoring memory storage, but before starting GridDiscoveryManager.
            if (!cacheGrps.isEmpty())
                startWarmUp();
        }

        /**
         * Restoring the state of partitions for cache groups.
         *
         * @param forGroups Cache groups.
         * @param partStates Partition states.
         * @throws IgniteCheckedException If failed.
         */
        private void restorePartitionStates(
            Collection<CacheGroupContext> forGroups,
            Map<GroupPartitionId, Integer> partStates
        ) throws IgniteCheckedException {
            long startRestorePart = U.currentTimeMillis();

            if (log.isInfoEnabled())
                log.info("Restoring partition state for local groups.");

            AtomicReference<IgniteCheckedException> restoreStateError = new AtomicReference<>();

            ExecutorService sysPool = ctx.pools().getSystemExecutorService();

            final int totalPart = forGroups.stream().mapToInt(grpCtx -> grpCtx.affinity().partitions()).sum();

            CountDownLatch completionLatch = new CountDownLatch(totalPart);

            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new ConcurrentHashMap<>();

            final int topPartRefLimit = 5;

            for (CacheGroupContext grpCtx : forGroups) {
                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
                    final int partId = i;

                    sysPool.execute(() -> {
                        GroupPartitionId grpPartId = new GroupPartitionId(grpCtx.groupId(), partId);

                        try {
                            long time = grpCtx.offheap().restoreStateOfPartition(partId, partStates.get(grpPartId));

                            if (log.isInfoEnabled()) {
                                T3<Long, Long, GroupPartitionId> curPart = new T3<>(time, U.currentTimeMillis(), grpPartId);

                                RestorePartitionStateThreadContext threadCtx = threadCtxs.computeIfAbsent(
                                    Thread.currentThread(),
                                    t -> new RestorePartitionStateThreadContext()
                                );

                                Comparator<T3<Long, Long, GroupPartitionId>> cmp = processedPartitionComparator();

                                threadCtx.topPartRef.updateAndGet(prev -> {
                                    if (prev == null ||
                                        cmp.compare(prev.last(), curPart) < 0) {
                                        SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(cmp);

                                        top.add(curPart);

                                        if (prev != null)
                                            top.addAll(prev);

                                        trimToSize(top, topPartRefLimit);

                                        return top;
                                    }
                                    else
                                        return prev;
                                });

                                threadCtx.incrementProcessedCnt();
                            }
                        }
                        catch (IgniteCheckedException | RuntimeException | Error e) {
                            U.error(log, "Failed to restore partition state for " +
                                "groupName=" + grpCtx.name() + " groupId=" + grpCtx.groupId(), e);

                            IgniteCheckedException ex = e instanceof IgniteCheckedException
                                ? ((IgniteCheckedException)e)
                                : new IgniteCheckedException(e);

                            if (!restoreStateError.compareAndSet(null, ex))
                                restoreStateError.get().addSuppressed(ex);
                        }
                        finally {
                            completionLatch.countDown();
                        }
                    });
                }
            }

            boolean printTop = false;

            try {
                // Await completion restore state tasks in all stripes.
                if (!log.isInfoEnabled())
                    completionLatch.await();
                else {
                    long timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS;

                    while (!completionLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                        if (log.isInfoEnabled()) {
                            SortedSet<T3<Long, Long, GroupPartitionId>> top =
                                collectTopProcessedParts(threadCtxs.values(), topPartRefLimit);

                            long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();

                            log.info("Restore partitions state progress [grpCnt=" +
                                (forGroups.size() - completionLatch.getCount()) + '/' + forGroups.size() +
                                ", partitionCnt=" + totalProcessed + '/' + totalPart + (top.isEmpty() ? "" :
                                ", topProcessedPartitions=" + toStringTopProcessingPartitions(top, forGroups)) + ']');
                        }

                        timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS / 5;

                        printTop = true;
                    }
                }
            }
            catch (InterruptedException e) {
                throw new IgniteInterruptedException(e);
            }

            for (CacheGroupContext grpCtx : forGroups)
                grpCtx.offheap().confirmPartitionStatesRestored();

            // Checking error after all task applied.
            if (restoreStateError.get() != null)
                throw restoreStateError.get();

            if (log.isInfoEnabled()) {
                SortedSet<T3<Long, Long, GroupPartitionId>> t =
                    printTop ? collectTopProcessedParts(threadCtxs.values(), topPartRefLimit) : null;

                long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();

                log.info("Finished restoring partition state for local groups [" +
                    "groupsProcessed=" + forGroups.size() +
                    ", partitionsProcessed=" + totalProcessed +
                    ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startRestorePart) +
                    (t == null ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(t, forGroups)) +
                    "]");
            }
        }

        /**
         * Collects top processed partitions from thread local contexts of restore partition state process.
         *
         * @param threadCtxs Thread local contexts.
         * @param topPartRefLimit Limit of top partitions collection size.
         */
        private SortedSet<T3<Long, Long, GroupPartitionId>> collectTopProcessedParts(
            Collection<RestorePartitionStateThreadContext> threadCtxs,
            int topPartRefLimit
        ) {
            SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(processedPartitionComparator());

            for (RestorePartitionStateThreadContext threadCtx : threadCtxs) {
                SortedSet<T3<Long, Long, GroupPartitionId>> threadTop = threadCtx.topPartRef.get();

                if (threadTop != null)
                    top.addAll(threadTop);
            }

            trimToSize(top, topPartRefLimit);

            return top;
        }

        /**
         * Start warming up sequentially for each persist data region.
         *
         * @throws IgniteCheckedException If failed.
         */
        private void startWarmUp() throws IgniteCheckedException {
            boolean start = false;

            try {
                // Collecting custom and default data regions.
                DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();

                List<DataRegionConfiguration> regCfgs =
                    new ArrayList<>(asList(dsCfg.getDefaultDataRegionConfiguration()));

                if (nonNull(dsCfg.getDataRegionConfigurations()))
                    regCfgs.addAll(asList(dsCfg.getDataRegionConfigurations()));

                // Warm-up start.
                Map<Class<? extends WarmUpConfiguration>, WarmUpStrategy> warmUpStrats = CU.warmUpStrategies(ctx);

                WarmUpConfiguration dfltWarmUpCfg = dsCfg.getDefaultWarmUpConfiguration();

                for (DataRegionConfiguration regCfg : regCfgs) {
                    if (stopWarmUp.get())
                        return;

                    if (!regCfg.isPersistenceEnabled())
                        continue;

                    WarmUpConfiguration warmUpCfg = nonNull(regCfg.getWarmUpConfiguration()) ?
                        regCfg.getWarmUpConfiguration() : dfltWarmUpCfg;

                    if (isNull(warmUpCfg))
                        continue;

                    WarmUpStrategy warmUpStrat = (curWarmUpStrat = warmUpStrats.get(warmUpCfg.getClass()));

                    DataRegion region = sharedCtx.database().dataRegion(regCfg.getName());

                    if (!stopWarmUp.get()) {
                        if (!start && (start = true) && log.isInfoEnabled())
                            log.info("Warm-up start.");

                        if (log.isInfoEnabled()) {
                            log.info("Start warm-up for data region [name=" + regCfg.getName()
                                + ", warmUpStrategy=" + warmUpStrat + ", warmUpConfig=" + warmUpCfg + ", isDefault="
                                + (warmUpCfg == dfltWarmUpCfg) + ']');
                        }

                        warmUpStrat.warmUp(warmUpCfg, region);

                        if (log.isInfoEnabled())
                            log.info("Finish of warm-up data region: " + region.config().getName());
                    }
                }
            }
            finally {
                if (stopWarmUp.get() && log.isInfoEnabled())
                    log.info("Warm-up stop.");
                else if (start && log.isInfoEnabled())
                    log.info("Warm-up finish.");

                stopWarmUp.set(true);
                curWarmUpStrat = null;
            }
        }
    }

    /**
     * Stop warming up and current running strategy.
     *
     * @return {@code true} if stopped by this call.
     * @throws IgniteCheckedException If there is an error when stopping warm-up.
     */
    public boolean stopWarmUp() throws IgniteCheckedException {
        if (recovery.stopWarmUp.compareAndSet(false, true)) {
            WarmUpStrategy strat = recovery.curWarmUpStrat;

            if (log.isInfoEnabled())
                log.info("Stopping warm-up strategy: " + strat);

            if (nonNull(strat))
                strat.stop();

            return true;
        }

        return false;
    }

    /**
     * Handle of fail during cache start.
     *
     * @param <T> Type of started data.
     */
    private interface StartCacheFailHandler<T, R> {
        /**
         * Handle of fail.
         *
         * @param data Start data.
         * @param startCacheOperation Operation for start cache.
         * @throws IgniteCheckedException if failed.
         */
        void handle(T data, IgniteThrowableFunction<T, R> startCacheOperation) throws IgniteCheckedException;
    }

    /**
     *
     */
    private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> {
        /** */
        private UUID id;

        /**
         * @param id Future ID.
         */
        private DynamicCacheStartFuture(UUID id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
            // Make sure to remove future before completion.
            pendingFuts.remove(id, this);

            context().exchange().exchangerUpdateHeartbeat();

            return super.onDone(res, err);
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(DynamicCacheStartFuture.class, this);
        }
    }

    /**
     *
     */
    private class TemplateConfigurationFuture extends GridFutureAdapter<Object> {
        /** Start ID. */
        @GridToStringInclude
        private IgniteUuid deploymentId;

        /** Cache name. */
        private String cacheName;

        /**
         * @param cacheName Cache name.
         * @param deploymentId Deployment ID.
         */
        private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) {
            this.deploymentId = deploymentId;
            this.cacheName = cacheName;
        }

        /**
         * @return Start ID.
         */
        public IgniteUuid deploymentId() {
            return deploymentId;
        }

        /** {@inheritDoc} */
        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
            // Make sure to remove future before completion.
            pendingTemplateFuts.remove(cacheName, this);

            return super.onDone(res, err);
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(TemplateConfigurationFuture.class, this);
        }
    }

    /**
     * Enable statistics future.
     */
    private class EnableStatisticsFuture extends GridFutureAdapter<Void> {
        /** */
        private UUID id;

        /**
         * @param id Future ID.
         */
        private EnableStatisticsFuture(UUID id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
            // Make sure to remove future before completion.
            manageStatisticsFuts.remove(id, this);

            return super.onDone(res, err);
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(EnableStatisticsFuture.class, this);
        }
    }

    /**
     * Creation of a string representation of the top (descending) partitions, the processing of which took the most time.
     *
     * @param top Top (ascending) processed partitions.
     * @param groups Cache group contexts.
     * @return String representation.
     */
    static String toStringTopProcessingPartitions(
        SortedSet<T3<Long, Long, GroupPartitionId>> top,
        Collection<CacheGroupContext> groups
    ) {
        if (top.isEmpty())
            return "[]";

        StringJoiner sj0 = new StringJoiner(", ", "[", "]");

        TreeMap<Long, List<GroupPartitionId>> top0 = top.stream().collect(groupingBy(
            T3::get1,
            TreeMap::new,
            mapping(T3::get3, toList())
        ));

        for (Map.Entry<Long, List<GroupPartitionId>> e0 : top0.descendingMap().entrySet()) {
            Map<Integer, List<GroupPartitionId>> byCacheGrpId =
                e0.getValue().stream().collect(groupingBy(GroupPartitionId::getGroupId));

            StringJoiner sj1 = new StringJoiner(", ", "[", "]");

            for (Map.Entry<Integer, List<GroupPartitionId>> e1 : byCacheGrpId.entrySet()) {
                CacheGroupContext grp = groups.stream().filter(g -> g.groupId() == e1.getKey()).findAny().orElse(null);

                String parts = e1.getValue().stream().map(GroupPartitionId::getPartitionId).sorted()
                    .map(p -> grp == null ? p.toString() : p + ":" + grp.topology().localPartition(p).fullSize())
                    .collect(Collectors.joining(", ", "[", "]"));

                sj1.add("[grp=" + (grp == null ? e1.getKey() : grp.cacheOrGroupName()) + ", part=" + parts + ']');
            }

            sj0.add("[time=" + U.humanReadableDuration(e0.getKey()) + ' ' + sj1.toString() + ']');
        }

        return sj0.toString();
    }

    /**
     * Trimming the set to the required size.
     * Removing items will be in ascending order.
     *
     * @param set Set.
     * @param size Size.
     */
    static <E> void trimToSize(SortedSet<E> set, int size) {
        while (set.size() > size)
            set.remove(set.first());
    }

    /**
     * Comparator of processed partitions.
     * T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
     * Sort order: duration -> timestamp (reversed order) -> partition of group.
     *
     * @return Comparator.
     */
    static Comparator<T3<Long, Long, GroupPartitionId>> processedPartitionComparator() {
        Comparator<T3<Long, Long, GroupPartitionId>> comp = Comparator.comparing(T3::get1);

        return comp.thenComparing(T3::get2, Comparator.reverseOrder()).thenComparing(T3::get3);
    }

    /**
     * Thread local context of restore partition state progress.
     */
    private static class RestorePartitionStateThreadContext {
        /** Field updater. */
        static final AtomicLongFieldUpdater<RestorePartitionStateThreadContext> PROCESSED_CNT_UPD =
            AtomicLongFieldUpdater.newUpdater(RestorePartitionStateThreadContext.class, "processedCnt");

        /** Top partitions by processing time. */
        final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef =
            new AtomicReference<>();

        /** Processed partitions count. It is always updated from the same thread. */
        volatile long processedCnt = 0;

        /**
         * Increment {@code processedCnt} field.
         */
        void incrementProcessedCnt() {
            PROCESSED_CNT_UPD.incrementAndGet(this);
        }
    }
}
